Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add length param to multipart requests #1415

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand All @@ -20,6 +21,7 @@

from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
Expand Down Expand Up @@ -100,7 +102,8 @@ def _tracing_thread_drain_queue(
def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
if client.compressed_runs is None:
return None, None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()
Expand Down Expand Up @@ -214,6 +217,24 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
client.compressed_runs = CompressedRuns()
client._data_available_event = threading.Event()
threading.Thread(
target=tracing_control_thread_func_compress_parallel,
args=(weakref.ref(client),),
).start()

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down Expand Up @@ -256,6 +277,7 @@ def keep_thread_active() -> bool:
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
Expand All @@ -264,12 +286,20 @@ def keep_thread_active() -> bool:


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
client_ref: weakref.ref[Client], flush_interval: float = 0.5
) -> None:
client = client_ref()
if client is None:
return

if (
client.compressed_runs is None
or client._data_available_event is None
or client._futures is None
):
logger.error("Required compression attributes not initialized")
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down Expand Up @@ -300,35 +330,67 @@ def keep_thread_active() -> bool:
# for now, keep thread alive
return True

last_flush_time = time.monotonic()

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If data arrived, clear the event and attempt a drain
if triggered:
client._data_available_event.clear()

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If we have data, submit the send request
if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

else:
if (time.monotonic() - last_flush_time) >= flush_interval:
data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
)
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream, compressed_runs_info)
if data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

# Drain the buffer on exit
# Drain the buffer on exit (final flush)
try:
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
)
if final_data_stream is not None:
try:
Expand Down
17 changes: 1 addition & 16 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import io
import threading

try:
from zstandard import ZstdCompressor # type: ignore[import]

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False
from zstandard import ZstdCompressor # type: ignore[import]

from langsmith import utils as ls_utils

Expand All @@ -20,11 +15,6 @@ def __init__(self):
self.lock = threading.Lock()
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
Expand All @@ -34,11 +24,6 @@ def reset(self):
self.run_count = 0
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
8 changes: 4 additions & 4 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 648 ms +- 59 ms ........... create_10_000_run_trees: Mean +- std dev: 1.31 sec +- 0.10 sec ........... create_20_000_run_trees: Mean +- std dev: 1.32 sec +- 0.08 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 713 us +- 31 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.0 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.2 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (17.3 ms) is 24% of the mean (72.2 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 72.2 ms +- 17.3 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 194 ms +- 5 ms

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 215 ms | 194 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 653 ms | 648 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 25.0 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.3 ms | 25.2 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.32 sec | 1.31 sec: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 711 us | 713 us: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 104 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.30 sec | 1.32 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 66.8 ms | 72.2 ms: 1.08x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x faster | +-----------------------------------------------+----------+------------------------+

import itertools
import logging
Expand Down Expand Up @@ -201,7 +201,7 @@
(
None,
op.feedback,
"application/json",
f"application/json; length={len(op.feedback)}",
{"Content-Length": str(len(op.feedback))},
),
)
Expand All @@ -222,7 +222,7 @@
(
None,
op._none,
"application/json",
f"application/json; length={len(op._none)}",
{"Content-Length": str(len(op._none))},
),
)
Expand All @@ -241,7 +241,7 @@
(
None,
valb,
"application/json",
f"application/json; length={len(valb)}",
{"Content-Length": str(len(valb))},
),
),
Expand All @@ -263,7 +263,7 @@
(
None,
valb,
content_type,
f"{content_type}; length={len(valb)}",
{"Content-Length": str(len(valb))},
),
)
Expand Down
38 changes: 18 additions & 20 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@
from langsmith._internal._background_thread import (
tracing_control_thread_func as _tracing_control_thread_func,
)
from langsmith._internal._background_thread import (
tracing_control_thread_func_compress_parallel as _tracing_control_thread_func_compress_parallel,
)
from langsmith._internal._beta_decorator import warn_beta
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
Expand Down Expand Up @@ -476,31 +473,19 @@ def __init__(
# Create a session and register a finalizer to close it
session_ = session if session else requests.Session()
self.session = session_
if ls_utils.get_env_var("USE_RUN_COMPRESSION"):
self._futures: set[cf.Future] = set()
self.compressed_runs: Optional[CompressedRuns] = CompressedRuns()
self._data_available_event = threading.Event()
else:
self.compressed_runs = None

self._info = (
info
if info is None or isinstance(info, ls_schemas.LangSmithInfo)
else ls_schemas.LangSmithInfo(**info)
)
weakref.finalize(self, close_session, self.session)
atexit.register(close_session, session_)
self.compressed_runs: Optional[CompressedRuns] = None
self._data_available_event: Optional[threading.Event] = None
self._futures: Optional[set[cf.Future]] = None
# Initialize auto batching
if auto_batch_tracing and self.compressed_runs is not None:
self.tracing_queue: Optional[PriorityQueue] = None
threading.Thread(
target=_tracing_control_thread_func_compress_parallel,
# arg must be a weakref to self to avoid the Thread object
# preventing garbage collection of the Client object
args=(weakref.ref(self),),
).start()
elif auto_batch_tracing:
self.tracing_queue = PriorityQueue()
if auto_batch_tracing:
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
target=_tracing_control_thread_func,
Expand Down Expand Up @@ -1293,6 +1278,10 @@ def create_run(
if self._pyo3_client is not None:
self._pyo3_client.create_run(run_create)
elif self.compressed_runs is not None:
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
serialized_op = serialize_run_dict("post", run_create)
multipart_form = (
serialized_run_operation_to_multipart_parts_and_context(
Expand Down Expand Up @@ -1995,6 +1984,10 @@ def update_run(
)
)
with self.compressed_runs.lock:
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs,
Expand Down Expand Up @@ -2030,6 +2023,11 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
if self.compressed_runs is None:
return

if self._futures is None:
raise ValueError(
"Run compression is enabled but request pool futures is not set"
)

# Attempt to drain and send any remaining data
from langsmith._internal._background_thread import (
HTTP_REQUEST_THREAD_POOL,
Expand Down
12 changes: 12 additions & 0 deletions python/langsmith/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,3 +795,15 @@ def _get_function_name(fn: Callable, depth: int = 0) -> str:
return _get_function_name(fn.__call__, depth + 1)

return str(fn)


def is_truish(val: Any) -> bool:
"""Check if the value is truish.

Args:
val (Any): The value to check.

Returns:
bool: True if the value is truish, False otherwise.
"""
return val is True or val == "true" or val == "True" or val == "TRUE" or val == "1"
9 changes: 4 additions & 5 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading