Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add llm observability to python sdk #158

Merged
merged 19 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions llm_observability_examples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import uuid

import posthog
from posthog.ai import AsyncOpenAI, OpenAI

# Example credentials - replace these with your own or use environment variables
posthog.project_api_key = os.getenv("POSTHOG_PROJECT_API_KEY", "your-project-api-key")
posthog.personal_api_key = os.getenv("POSTHOG_PERSONAL_API_KEY", "your-personal-api-key")
posthog.host = os.getenv("POSTHOG_HOST", "http://localhost:8000") # Or https://app.posthog.com
posthog.debug = True

openai_client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"),
posthog_client=posthog,
)

async_openai_client = AsyncOpenAI(
api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"),
posthog_client=posthog,
)


def main_sync():
trace_id = str(uuid.uuid4())
print("Trace ID:", trace_id)
distinct_id = "test_distinct_id"
properties = {"test_property": "test_value"}

try:
basic_openai_call(distinct_id, trace_id, properties)
streaming_openai_call(distinct_id, trace_id, properties)
non_instrumented_openai_call()
except Exception as e:
print("Error during OpenAI call:", str(e))


async def main_async():
trace_id = str(uuid.uuid4())
print("Trace ID:", trace_id)
distinct_id = "test_distinct_id"
properties = {"test_property": "test_value"}

try:
await basic_async_openai_call(distinct_id, trace_id, properties)
await streaming_async_openai_call(distinct_id, trace_id, properties)
except Exception as e:
print("Error during OpenAI call:", str(e))


def basic_openai_call(distinct_id, trace_id, properties):
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a complex problem solver."},
{"role": "user", "content": "Explain quantum computing in simple terms."},
],
max_tokens=100,
temperature=0.7,
posthog_distinct_id=distinct_id,
posthog_trace_id=trace_id,
posthog_properties=properties,
)
print(response)
if response and response.choices:
print("OpenAI response:", response.choices[0].message.content)
else:
print("No response or unexpected format returned.")
return response


async def basic_async_openai_call(distinct_id, trace_id, properties):
response = await async_openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a complex problem solver."},
{"role": "user", "content": "Explain quantum computing in simple terms."},
],
max_tokens=100,
temperature=0.7,
posthog_distinct_id=distinct_id,
posthog_trace_id=trace_id,
posthog_properties=properties,
)
if response and hasattr(response, "choices"):
print("OpenAI response:", response.choices[0].message.content)
else:
print("No response or unexpected format returned.")
return response


def streaming_openai_call(distinct_id, trace_id, properties):

response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a complex problem solver."},
{"role": "user", "content": "Explain quantum computing in simple terms."},
],
max_tokens=100,
temperature=0.7,
stream=True,
posthog_distinct_id=distinct_id,
posthog_trace_id=trace_id,
posthog_properties=properties,
)

for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
print(chunk.choices[0].delta.content or "", end="")

return response


async def streaming_async_openai_call(distinct_id, trace_id, properties):
response = await async_openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a complex problem solver."},
{"role": "user", "content": "Explain quantum computing in simple terms."},
],
max_tokens=100,
temperature=0.7,
stream=True,
posthog_distinct_id=distinct_id,
posthog_trace_id=trace_id,
posthog_properties=properties,
)

async for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
print(chunk.choices[0].delta.content or "", end="")

return response


def non_instrumented_openai_call():
response = openai_client.images.generate(model="dall-e-3", prompt="A cute baby hedgehog", n=1, size="1024x1024")
print(response)
return response


# HOW TO RUN:
# comment out one of these to run the other

if __name__ == "__main__":
main_sync()

# asyncio.run(main_async())
4 changes: 4 additions & 0 deletions posthog/ai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .providers.openai.openai import OpenAI
from .providers.openai.openai_async import AsyncOpenAI

__all__ = ["OpenAI", "AsyncOpenAI"]
167 changes: 167 additions & 0 deletions posthog/ai/providers/openai/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import time
import uuid
from typing import Any, Dict, Optional

import openai.resources

try:
import openai
except ImportError:
raise ModuleNotFoundError("Please install the OpenAI SDK to use this feature: 'pip install openai'")

from posthog.ai.utils import call_llm_and_track_usage, get_model_params
from posthog.client import Client as PostHogClient


class OpenAI(openai.OpenAI):
"""
A wrapper around the OpenAI SDK that automatically sends LLM usage events to PostHog.
"""

_ph_client: PostHogClient

def __init__(self, posthog_client: PostHogClient, **kwargs):
"""
Args:
api_key: OpenAI API key.
posthog_client: If provided, events will be captured via this client instead
of the global posthog.
**openai_config: Any additional keyword args to set on openai (e.g. organization="xxx").
"""
super().__init__(**kwargs)
self._ph_client = posthog_client
self.chat = WrappedChat(self)


class WrappedChat(openai.resources.chat.Chat):
_client: OpenAI

@property
def completions(self):
return WrappedCompletions(self._client)


class WrappedCompletions(openai.resources.chat.completions.Completions):
_client: OpenAI

def create(
self,
posthog_distinct_id: Optional[str] = None,
posthog_trace_id: Optional[str] = None,
posthog_properties: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
distinct_id = posthog_distinct_id or uuid.uuid4()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about UUIDv7?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use 4 in the rest of the repo, so trying to keep it consistant


if kwargs.get("stream", False):
return self._create_streaming(
distinct_id,
posthog_trace_id,
posthog_properties,
**kwargs,
)

return call_llm_and_track_usage(
distinct_id,
self._client._ph_client,
posthog_trace_id,
posthog_properties,
self._client.base_url,
super().create,
**kwargs,
)

def _create_streaming(
self,
distinct_id: str,
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {}
accumulated_content = []
if "stream_options" not in kwargs:
kwargs["stream_options"] = {}
kwargs["stream_options"]["include_usage"] = True
response = super().create(**kwargs)

def generator():
nonlocal usage_stats
nonlocal accumulated_content
try:
for chunk in response:
if hasattr(chunk, "usage") and chunk.usage:
usage_stats = {
k: getattr(chunk.usage, k, 0)
for k in [
"prompt_tokens",
"completion_tokens",
"total_tokens",
]
}

if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
content = chunk.choices[0].delta.content
if content:
accumulated_content.append(content)

yield chunk

finally:
end_time = time.time()
latency = end_time - start_time
output = "".join(accumulated_content)
self._capture_streaming_event(
distinct_id,
posthog_trace_id,
posthog_properties,
kwargs,
usage_stats,
latency,
output,
)

return generator()

def _capture_streaming_event(
self,
distinct_id: str,
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
latency: float,
output: str,
):
if posthog_trace_id is None:
posthog_trace_id = uuid.uuid4()

event_properties = {
"$ai_provider": "openai",
"$ai_model": kwargs.get("model"),
"$ai_model_parameters": get_model_params(kwargs),
"$ai_input": kwargs.get("messages"),
"$ai_output": {
"choices": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to send an object here instead of the array with choices directly? We've already flattened the structure with additional fields like input/output tokens, so this might be the output itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although for most use cases "choices" only has one option, the user can ask the llm for mulitple im which case the choices array represents n options for a response. If we remove the object and just return the array it would look more like the messages array representing a history.

{
"content": output,
"role": "assistant",
}
]
},
"$ai_request_url": str(self._client.base_url.join("chat/completions")),
"$ai_http_status": 200,
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
"$ai_output_tokens": usage_stats.get("completion_tokens", 0),
"$ai_latency": latency,
"$ai_trace_id": posthog_trace_id,
k11kirky marked this conversation as resolved.
Show resolved Hide resolved
"$ai_posthog_properties": posthog_properties,
}

if hasattr(self._client._ph_client, "capture"):
self._client._ph_client.capture(
distinct_id=distinct_id,
event="$ai_generation",
properties=event_properties,
)
Loading
Loading