diff --git a/llm_observability_examples.py b/llm_observability_examples.py new file mode 100644 index 0000000..f6ab46e --- /dev/null +++ b/llm_observability_examples.py @@ -0,0 +1,149 @@ +import os +import uuid + +import posthog +from posthog.ai import AsyncOpenAI, OpenAI + +# Example credentials - replace these with your own or use environment variables +posthog.project_api_key = os.getenv("POSTHOG_PROJECT_API_KEY", "your-project-api-key") +posthog.personal_api_key = os.getenv("POSTHOG_PERSONAL_API_KEY", "your-personal-api-key") +posthog.host = os.getenv("POSTHOG_HOST", "http://localhost:8000") # Or https://app.posthog.com +posthog.debug = True + +openai_client = OpenAI( + api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"), + posthog_client=posthog, +) + +async_openai_client = AsyncOpenAI( + api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"), + posthog_client=posthog, +) + + +def main_sync(): + trace_id = str(uuid.uuid4()) + print("Trace ID:", trace_id) + distinct_id = "test_distinct_id" + properties = {"test_property": "test_value"} + + try: + basic_openai_call(distinct_id, trace_id, properties) + streaming_openai_call(distinct_id, trace_id, properties) + non_instrumented_openai_call() + except Exception as e: + print("Error during OpenAI call:", str(e)) + + +async def main_async(): + trace_id = str(uuid.uuid4()) + print("Trace ID:", trace_id) + distinct_id = "test_distinct_id" + properties = {"test_property": "test_value"} + + try: + await basic_async_openai_call(distinct_id, trace_id, properties) + await streaming_async_openai_call(distinct_id, trace_id, properties) + except Exception as e: + print("Error during OpenAI call:", str(e)) + + +def basic_openai_call(distinct_id, trace_id, properties): + response = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a complex problem solver."}, + {"role": "user", "content": "Explain quantum computing in simple terms."}, + ], + max_tokens=100, + temperature=0.7, + posthog_distinct_id=distinct_id, + posthog_trace_id=trace_id, + posthog_properties=properties, + ) + print(response) + if response and response.choices: + print("OpenAI response:", response.choices[0].message.content) + else: + print("No response or unexpected format returned.") + return response + + +async def basic_async_openai_call(distinct_id, trace_id, properties): + response = await async_openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a complex problem solver."}, + {"role": "user", "content": "Explain quantum computing in simple terms."}, + ], + max_tokens=100, + temperature=0.7, + posthog_distinct_id=distinct_id, + posthog_trace_id=trace_id, + posthog_properties=properties, + ) + if response and hasattr(response, "choices"): + print("OpenAI response:", response.choices[0].message.content) + else: + print("No response or unexpected format returned.") + return response + + +def streaming_openai_call(distinct_id, trace_id, properties): + + response = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a complex problem solver."}, + {"role": "user", "content": "Explain quantum computing in simple terms."}, + ], + max_tokens=100, + temperature=0.7, + stream=True, + posthog_distinct_id=distinct_id, + posthog_trace_id=trace_id, + posthog_properties=properties, + ) + + for chunk in response: + if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: + print(chunk.choices[0].delta.content or "", end="") + + return response + + +async def streaming_async_openai_call(distinct_id, trace_id, properties): + response = await async_openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a complex problem solver."}, + {"role": "user", "content": "Explain quantum computing in simple terms."}, + ], + max_tokens=100, + temperature=0.7, + stream=True, + posthog_distinct_id=distinct_id, + posthog_trace_id=trace_id, + posthog_properties=properties, + ) + + async for chunk in response: + if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: + print(chunk.choices[0].delta.content or "", end="") + + return response + + +def non_instrumented_openai_call(): + response = openai_client.images.generate(model="dall-e-3", prompt="A cute baby hedgehog", n=1, size="1024x1024") + print(response) + return response + + +# HOW TO RUN: +# comment out one of these to run the other + +if __name__ == "__main__": + main_sync() + +# asyncio.run(main_async()) diff --git a/posthog/ai/__init__.py b/posthog/ai/__init__.py new file mode 100644 index 0000000..6fbccd1 --- /dev/null +++ b/posthog/ai/__init__.py @@ -0,0 +1,4 @@ +from .providers.openai.openai import OpenAI +from .providers.openai.openai_async import AsyncOpenAI + +__all__ = ["OpenAI", "AsyncOpenAI"] diff --git a/posthog/ai/providers/openai/openai.py b/posthog/ai/providers/openai/openai.py new file mode 100644 index 0000000..c207fde --- /dev/null +++ b/posthog/ai/providers/openai/openai.py @@ -0,0 +1,167 @@ +import time +import uuid +from typing import Any, Dict, Optional + +import openai.resources + +try: + import openai +except ImportError: + raise ModuleNotFoundError("Please install the OpenAI SDK to use this feature: 'pip install openai'") + +from posthog.ai.utils import call_llm_and_track_usage, get_model_params +from posthog.client import Client as PostHogClient + + +class OpenAI(openai.OpenAI): + """ + A wrapper around the OpenAI SDK that automatically sends LLM usage events to PostHog. + """ + + _ph_client: PostHogClient + + def __init__(self, posthog_client: PostHogClient, **kwargs): + """ + Args: + api_key: OpenAI API key. + posthog_client: If provided, events will be captured via this client instead + of the global posthog. + **openai_config: Any additional keyword args to set on openai (e.g. organization="xxx"). + """ + super().__init__(**kwargs) + self._ph_client = posthog_client + self.chat = WrappedChat(self) + + +class WrappedChat(openai.resources.chat.Chat): + _client: OpenAI + + @property + def completions(self): + return WrappedCompletions(self._client) + + +class WrappedCompletions(openai.resources.chat.completions.Completions): + _client: OpenAI + + def create( + self, + posthog_distinct_id: Optional[str] = None, + posthog_trace_id: Optional[str] = None, + posthog_properties: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ): + distinct_id = posthog_distinct_id or uuid.uuid4() + + if kwargs.get("stream", False): + return self._create_streaming( + distinct_id, + posthog_trace_id, + posthog_properties, + **kwargs, + ) + + return call_llm_and_track_usage( + distinct_id, + self._client._ph_client, + posthog_trace_id, + posthog_properties, + self._client.base_url, + super().create, + **kwargs, + ) + + def _create_streaming( + self, + distinct_id: str, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + **kwargs: Any, + ): + start_time = time.time() + usage_stats: Dict[str, int] = {} + accumulated_content = [] + if "stream_options" not in kwargs: + kwargs["stream_options"] = {} + kwargs["stream_options"]["include_usage"] = True + response = super().create(**kwargs) + + def generator(): + nonlocal usage_stats + nonlocal accumulated_content + try: + for chunk in response: + if hasattr(chunk, "usage") and chunk.usage: + usage_stats = { + k: getattr(chunk.usage, k, 0) + for k in [ + "prompt_tokens", + "completion_tokens", + "total_tokens", + ] + } + + if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: + content = chunk.choices[0].delta.content + if content: + accumulated_content.append(content) + + yield chunk + + finally: + end_time = time.time() + latency = end_time - start_time + output = "".join(accumulated_content) + self._capture_streaming_event( + distinct_id, + posthog_trace_id, + posthog_properties, + kwargs, + usage_stats, + latency, + output, + ) + + return generator() + + def _capture_streaming_event( + self, + distinct_id: str, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + kwargs: Dict[str, Any], + usage_stats: Dict[str, int], + latency: float, + output: str, + ): + if posthog_trace_id is None: + posthog_trace_id = uuid.uuid4() + + event_properties = { + "$ai_provider": "openai", + "$ai_model": kwargs.get("model"), + "$ai_model_parameters": get_model_params(kwargs), + "$ai_input": kwargs.get("messages"), + "$ai_output": { + "choices": [ + { + "content": output, + "role": "assistant", + } + ] + }, + "$ai_request_url": str(self._client.base_url.join("chat/completions")), + "$ai_http_status": 200, + "$ai_input_tokens": usage_stats.get("prompt_tokens", 0), + "$ai_output_tokens": usage_stats.get("completion_tokens", 0), + "$ai_latency": latency, + "$ai_trace_id": posthog_trace_id, + "$ai_posthog_properties": posthog_properties, + } + + if hasattr(self._client._ph_client, "capture"): + self._client._ph_client.capture( + distinct_id=distinct_id, + event="$ai_generation", + properties=event_properties, + ) diff --git a/posthog/ai/providers/openai/openai_async.py b/posthog/ai/providers/openai/openai_async.py new file mode 100644 index 0000000..ff939e0 --- /dev/null +++ b/posthog/ai/providers/openai/openai_async.py @@ -0,0 +1,166 @@ +import time +import uuid +from typing import Any, Dict, Optional + +import openai.resources + +try: + import openai +except ImportError: + raise ModuleNotFoundError("Please install the OpenAI SDK to use this feature: 'pip install openai'") + +from posthog.ai.utils import call_llm_and_track_usage_async, get_model_params +from posthog.client import Client as PostHogClient + + +class AsyncOpenAI(openai.AsyncOpenAI): + """ + An async wrapper around the OpenAI SDK that automatically sends LLM usage events to PostHog. + """ + + _ph_client: PostHogClient + + def __init__(self, posthog_client: PostHogClient, **kwargs): + """ + Args: + api_key: OpenAI API key. + posthog_client: If provided, events will be captured via this client instance. + **openai_config: Additional keyword args (e.g. organization="xxx"). + """ + super().__init__(**kwargs) + self._ph_client = posthog_client + self.chat = WrappedChat(self) + + +class WrappedChat(openai.resources.chat.AsyncChat): + _client: AsyncOpenAI + + @property + def completions(self): + return WrappedCompletions(self._client) + + +class WrappedCompletions(openai.resources.chat.completions.AsyncCompletions): + _client: AsyncOpenAI + + async def create( + self, + posthog_distinct_id: Optional[str] = None, + posthog_trace_id: Optional[str] = None, + posthog_properties: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ): + distinct_id = posthog_distinct_id or uuid.uuid4() + + # If streaming, handle streaming specifically + if kwargs.get("stream", False): + return await self._create_streaming( + distinct_id, + posthog_trace_id, + posthog_properties, + **kwargs, + ) + + response = await call_llm_and_track_usage_async( + distinct_id, + self._client._ph_client, + posthog_trace_id, + posthog_properties, + self._client.base_url, + super().create, + **kwargs, + ) + return response + + async def _create_streaming( + self, + distinct_id: str, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + **kwargs: Any, + ): + start_time = time.time() + usage_stats: Dict[str, int] = {} + accumulated_content = [] + if "stream_options" not in kwargs: + kwargs["stream_options"] = {} + kwargs["stream_options"]["include_usage"] = True + response = await super().create(**kwargs) + + async def async_generator(): + nonlocal usage_stats, accumulated_content + try: + async for chunk in response: + if hasattr(chunk, "usage") and chunk.usage: + usage_stats = { + k: getattr(chunk.usage, k, 0) + for k in [ + "prompt_tokens", + "completion_tokens", + "total_tokens", + ] + } + if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: + content = chunk.choices[0].delta.content + if content: + accumulated_content.append(content) + + yield chunk + + finally: + end_time = time.time() + latency = end_time - start_time + output = "".join(accumulated_content) + self._capture_streaming_event( + distinct_id, + posthog_trace_id, + posthog_properties, + kwargs, + usage_stats, + latency, + output, + ) + + return async_generator() + + def _capture_streaming_event( + self, + distinct_id: str, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + kwargs: Dict[str, Any], + usage_stats: Dict[str, int], + latency: float, + output: str, + ): + if posthog_trace_id is None: + posthog_trace_id = uuid.uuid4() + + event_properties = { + "$ai_provider": "openai", + "$ai_model": kwargs.get("model"), + "$ai_model_parameters": get_model_params(kwargs), + "$ai_input": kwargs.get("messages"), + "$ai_output": { + "choices": [ + { + "content": output, + "role": "assistant", + } + ] + }, + "$ai_http_status": 200, + "$ai_input_tokens": usage_stats.get("prompt_tokens", 0), + "$ai_output_tokens": usage_stats.get("completion_tokens", 0), + "$ai_latency": latency, + "$ai_trace_id": posthog_trace_id, + "$ai_posthog_properties": posthog_properties, + "$ai_request_url": str(self._client.base_url.join("chat/completions")), + } + + if hasattr(self._client._ph_client, "capture"): + self._client._ph_client.capture( + distinct_id=distinct_id, + event="$ai_generation", + properties=event_properties, + ) diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py new file mode 100644 index 0000000..4023445 --- /dev/null +++ b/posthog/ai/utils.py @@ -0,0 +1,171 @@ +import time +import uuid +from typing import Any, Callable, Dict, Optional + +from httpx import URL + +from posthog.client import Client as PostHogClient + + +def get_model_params(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """ + Extracts model parameters from the kwargs dictionary. + """ + model_params = {} + for param in [ + "temperature", + "max_tokens", + "top_p", + "frequency_penalty", + "presence_penalty", + "n", + "stop", + "stream", + ]: + if param in kwargs: + model_params[param] = kwargs[param] + return model_params + + +def format_response(response): + """ + Format a regular (non-streaming) response. + """ + output = {"choices": []} + if response is None: + return output + for choice in response.choices: + if choice.message.content: + output["choices"].append( + { + "content": choice.message.content, + "role": choice.message.role, + } + ) + return output + + +def call_llm_and_track_usage( + distinct_id: str, + ph_client: PostHogClient, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + base_url: Optional[str], + call_method: Callable[..., Any], + **kwargs: Any, +) -> Any: + """ + Common usage-tracking logic for both sync and async calls. + call_method: the llm call method (e.g. openai.chat.completions.create) + """ + start_time = time.time() + response = None + error = None + http_status = 200 + usage: Dict[str, Any] = {} + + try: + response = call_method(**kwargs) + except Exception as exc: + error = exc + http_status = getattr(exc, "status_code", 0) # default to 0 becuase its likely an SDK error + finally: + end_time = time.time() + latency = end_time - start_time + + if posthog_trace_id is None: + posthog_trace_id = uuid.uuid4() + + if response and hasattr(response, "usage"): + usage = response.usage.model_dump() + + input_tokens = usage.get("prompt_tokens", 0) + output_tokens = usage.get("completion_tokens", 0) + event_properties = { + "$ai_provider": "openai", + "$ai_model": kwargs.get("model"), + "$ai_model_parameters": get_model_params(kwargs), + "$ai_input": kwargs.get("messages"), + "$ai_output": format_response(response), + "$ai_http_status": http_status, + "$ai_input_tokens": input_tokens, + "$ai_output_tokens": output_tokens, + "$ai_latency": latency, + "$ai_trace_id": posthog_trace_id, + "$ai_posthog_properties": posthog_properties, + "$ai_request_url": f"{base_url}/chat/completions", + } + + # send the event to posthog + if hasattr(ph_client, "capture") and callable(ph_client.capture): + ph_client.capture( + distinct_id=distinct_id, + event="$ai_generation", + properties=event_properties, + ) + + if error: + raise error + + return response + + +async def call_llm_and_track_usage_async( + distinct_id: str, + ph_client: PostHogClient, + posthog_trace_id: Optional[str], + posthog_properties: Optional[Dict[str, Any]], + base_url: URL, + call_async_method: Callable[..., Any], + **kwargs: Any, +) -> Any: + start_time = time.time() + response = None + error = None + http_status = 200 + usage: Dict[str, Any] = {} + + try: + response = await call_async_method(**kwargs) + except Exception as exc: + error = exc + http_status = getattr(exc, "status_code", 0) # default to 0 because its likely an SDK error + finally: + end_time = time.time() + latency = end_time - start_time + + if posthog_trace_id is None: + posthog_trace_id = uuid.uuid4() + + if response and hasattr(response, "usage"): + usage = response.usage.model_dump() + + input_tokens = usage.get("prompt_tokens", 0) + output_tokens = usage.get("completion_tokens", 0) + event_properties = { + "$ai_provider": "openai", + "$ai_model": kwargs.get("model"), + "$ai_model_parameters": get_model_params(kwargs), + "$ai_input": kwargs.get("messages"), + "$ai_output": format_response(response), + "$ai_http_status": http_status, + "$ai_input_tokens": input_tokens, + "$ai_output_tokens": output_tokens, + "$ai_latency": latency, + "$ai_trace_id": posthog_trace_id, + "$ai_posthog_properties": posthog_properties, + "$ai_request_url": str(base_url.join("chat/completions")), + } + + # send the event to posthog + if hasattr(ph_client, "capture") and callable(ph_client.capture): + ph_client.capture( + distinct_id=distinct_id, + event="$ai_generation", + properties=event_properties, + ) + + if error: + raise error + + return response