Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Embeddings + Personless events + Destructure property JSON #160

Merged
merged 8 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.8.0 - 2025-01-14

1. Add LLM Observability with support for OpenAI and Langchain callbacks.

## 3.7.5 - 2025-01-03

1. Add `distinct_id` to group_identify
Expand Down
43 changes: 40 additions & 3 deletions llm_observability_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
def main_sync():
trace_id = str(uuid.uuid4())
print("Trace ID:", trace_id)
distinct_id = "test_distinct_id"
distinct_id = "test2_distinct_id"
properties = {"test_property": "test_value"}

try:
basic_openai_call(distinct_id, trace_id, properties)
streaming_openai_call(distinct_id, trace_id, properties)
non_instrumented_openai_call()
embedding_openai_call(distinct_id, trace_id, properties)
image_openai_call()
except Exception as e:
print("Error during OpenAI call:", str(e))

Expand All @@ -44,6 +45,8 @@ async def main_async():
try:
await basic_async_openai_call(distinct_id, trace_id, properties)
await streaming_async_openai_call(distinct_id, trace_id, properties)
await embedding_async_openai_call(distinct_id, trace_id, properties)
await image_async_openai_call()
except Exception as e:
print("Error during OpenAI call:", str(e))

Expand Down Expand Up @@ -134,12 +137,46 @@ async def streaming_async_openai_call(distinct_id, trace_id, properties):
return response


def non_instrumented_openai_call():
# none instrumented
def image_openai_call():
response = openai_client.images.generate(model="dall-e-3", prompt="A cute baby hedgehog", n=1, size="1024x1024")
print(response)
return response


# none instrumented
async def image_async_openai_call():
response = await async_openai_client.images.generate(
model="dall-e-3", prompt="A cute baby hedgehog", n=1, size="1024x1024"
)
print(response)
return response


def embedding_openai_call(posthog_distinct_id, posthog_trace_id, posthog_properties):
response = openai_client.embeddings.create(
input="The hedgehog is cute",
model="text-embedding-3-small",
posthog_distinct_id=posthog_distinct_id,
posthog_trace_id=posthog_trace_id,
posthog_properties=posthog_properties,
)
print(response)
return response


async def embedding_async_openai_call(posthog_distinct_id, posthog_trace_id, posthog_properties):
response = await async_openai_client.embeddings.create(
input="The hedgehog is cute",
model="text-embedding-3-small",
posthog_distinct_id=posthog_distinct_id,
posthog_trace_id=posthog_trace_id,
posthog_properties=posthog_properties,
)
print(response)
return response


# HOW TO RUN:
# comment out one of these to run the other

Expand Down
6 changes: 3 additions & 3 deletions posthog/ai/langchain/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(
self._client = client
self._distinct_id = distinct_id
self._trace_id = trace_id
self._properties = properties
self._properties = properties or {}
self._runs = {}
self._parent_tree = {}

Expand Down Expand Up @@ -171,8 +171,8 @@ def on_llm_end(
"$ai_output_tokens": output_tokens,
"$ai_latency": latency,
"$ai_trace_id": trace_id,
"$ai_posthog_properties": self._properties,
"$ai_base_url": run.get("base_url"),
**self._properties,
}
if self._distinct_id is None:
event_properties["$process_person_profile"] = False
Expand Down Expand Up @@ -216,8 +216,8 @@ def on_llm_error(
"$ai_http_status": _get_http_status(error),
"$ai_latency": latency,
"$ai_trace_id": trace_id,
"$ai_posthog_properties": self._properties,
"$ai_base_url": run.get("base_url"),
**self._properties,
}
if self._distinct_id is None:
event_properties["$process_person_profile"] = False
Expand Down
89 changes: 80 additions & 9 deletions posthog/ai/openai/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, posthog_client: PostHogClient, **kwargs):
super().__init__(**kwargs)
self._ph_client = posthog_client
self.chat = WrappedChat(self)
self.embeddings = WrappedEmbeddings(self)


class WrappedChat(openai.resources.chat.Chat):
Expand All @@ -50,18 +51,19 @@ def create(
posthog_properties: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
distinct_id = posthog_distinct_id or uuid.uuid4()
if posthog_trace_id is None:
posthog_trace_id = uuid.uuid4()

if kwargs.get("stream", False):
return self._create_streaming(
distinct_id,
posthog_distinct_id,
posthog_trace_id,
posthog_properties,
**kwargs,
)

return call_llm_and_track_usage(
distinct_id,
posthog_distinct_id,
self._client._ph_client,
posthog_trace_id,
posthog_properties,
Expand All @@ -72,7 +74,7 @@ def create(

def _create_streaming(
self,
distinct_id: str,
posthog_distinct_id: Optional[str],
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
**kwargs: Any,
Expand Down Expand Up @@ -112,7 +114,7 @@ def generator():
latency = end_time - start_time
output = "".join(accumulated_content)
self._capture_streaming_event(
distinct_id,
posthog_distinct_id,
posthog_trace_id,
posthog_properties,
kwargs,
Expand All @@ -125,7 +127,7 @@ def generator():

def _capture_streaming_event(
self,
distinct_id: str,
posthog_distinct_id: Optional[str],
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
Expand All @@ -149,18 +151,87 @@ def _capture_streaming_event(
}
]
},
"$ai_request_url": str(self._client.base_url.join("chat/completions")),
"$ai_http_status": 200,
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
"$ai_output_tokens": usage_stats.get("completion_tokens", 0),
"$ai_latency": latency,
"$ai_trace_id": posthog_trace_id,
"$ai_posthog_properties": posthog_properties,
"$ai_base_url": str(self._client.base_url),
**posthog_properties,
}

if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False

if hasattr(self._client._ph_client, "capture"):
self._client._ph_client.capture(
distinct_id=distinct_id,
distinct_id=posthog_distinct_id or posthog_trace_id,
event="$ai_generation",
properties=event_properties,
)


class WrappedEmbeddings(openai.resources.embeddings.Embeddings):
_client: OpenAI

def create(
self,
posthog_distinct_id: Optional[str] = None,
posthog_trace_id: Optional[str] = None,
posthog_properties: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
"""
Create an embedding using OpenAI's 'embeddings.create' method, but also track usage in PostHog.

Args:
posthog_distinct_id: Optional ID to associate with the usage event.
posthog_trace_id: Optional trace UUID for linking events.
posthog_properties: Optional dictionary of extra properties to include in the event.
**kwargs: Any additional parameters for the OpenAI Embeddings API.

Returns:
The response from OpenAI's embeddings.create call.
"""
if posthog_trace_id is None:
posthog_trace_id = uuid.uuid4()

start_time = time.time()
response = super().create(**kwargs)
end_time = time.time()

# Extract usage statistics if available
usage_stats = {}
if hasattr(response, "usage") and response.usage:
usage_stats = {
"prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
"total_tokens": getattr(response.usage, "total_tokens", 0),
}

latency = end_time - start_time

# Build the event properties
event_properties = {
"$ai_provider": "openai",
"$ai_model": kwargs.get("model"),
"$ai_input": kwargs.get("input"),
"$ai_http_status": 200,
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
"$ai_latency": latency,
"$ai_trace_id": posthog_trace_id,
"$ai_base_url": str(self._client.base_url),
**posthog_properties,
}

if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False

# Send capture event for embeddings
if hasattr(self._client._ph_client, "capture"):
self._client._ph_client.capture(
distinct_id=posthog_distinct_id or posthog_trace_id,
event="$ai_embedding",
properties=event_properties,
)

return response
Loading
Loading