Skip to content

Commit

Permalink
Adding streaming support to the benchmarking script (#868)
Browse files Browse the repository at this point in the history
* initial prototype, haven't fully tested yet

* added ttft calculation

* fixed bug with sending stream request

* cleanup and save ttft results as json files

* final cleanup

* addressed comments
  • Loading branch information
Edwinhr716 authored Nov 6, 2024
1 parent dbf4a16 commit cc7e216
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,89 @@ def init_errors_map() -> Dict[str, int]:
}
return errors

async def send_stream_request(
backend: str,
api_url: str,
prompt: str,
prompt_len: int,
output_len: int,
best_of: int,
use_beam_search: bool,
top_k: int,
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
"""Sends stream request to server"""
request_start_time = time.time()
errors = init_errors_map()

headers = {"User-Agent": "Benchmark Client"}
if backend == "vllm":
pload = {
"model": model,
"prompt": prompt,
"n": 1,
"best_of": best_of,
"use_beam_search": use_beam_search,
"temperature": 0.0 if use_beam_search else 1.0,
"top_p": 1.0,
"max_tokens": output_len,
"ignore_eos": True,
"stream": True,
}
else:
raise ValueError(f"Unknown backend: {backend}")

ttft = 0.0
st = time.perf_counter()
output = ""
timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session:
try:
async with session.post(api_url, headers=headers, json=pload, ssl=False) as response:
async for chunk_bytes in response.content.iter_chunks():
chunk_bytes = chunk_bytes[0].strip()
if not chunk_bytes:
continue
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = timestamp - st

if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
if backend == "vllm":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
output_len = len(output_token_ids)
request_latency = (prompt_len, output_len, (request_end_time - request_start_time))
return request_latency, ttft, None

async def send_request(
backend: str,
api_url: str,
Expand All @@ -151,7 +234,7 @@ async def send_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
"""Sends request to server."""
request_start_time = time.time()
errors = init_errors_map()
Expand Down Expand Up @@ -237,27 +320,27 @@ async def send_request(
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, errors
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, errors
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, errors
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, errors
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, errors
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, errors
return None, None, errors

request_end_time = time.time()
# Naive HF transformers generation and TensorRT-LLM generation stops at EOS
Expand Down Expand Up @@ -291,14 +374,14 @@ async def send_request(
prompt_length_metric.observe(prompt_len)
response_length_metric.observe(output_len)

return request_latency, None
return request_latency, None, None

async def benchmark(
args: argparse.Namespace,
api_url: str,
tokenizer: PreTrainedTokenizerBase,
model: str,
) -> Tuple[List[Tuple[int, int, float]], Dict[str, int]]:
) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]:
"""Runs benchmark with asynchronous requests."""
input_requests = sample_requests(
args.dataset,
Expand All @@ -312,8 +395,9 @@ async def benchmark(
tasks: List[asyncio.Task] = []
async for request in get_request(input_requests, args.request_rate):
prompt, prompt_len, output_len = request
task = asyncio.create_task(
send_request(
if args.stream_request:
task = asyncio.create_task(
send_stream_request(
args.backend,
api_url,
prompt,
Expand All @@ -326,21 +410,40 @@ async def benchmark(
args.sax_model,
model,
)
)
)
else:
task = asyncio.create_task(
send_request(
args.backend,
api_url,
prompt,
prompt_len,
output_len,
args.best_of,
args.use_beam_search,
args.top_k,
tokenizer,
args.sax_model,
model,
)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
combined_latencies = []
combined_ttfts = []
combined_errors = init_errors_map()
for latency, errors in results:
for latency, ttft, errors in results:
if latency:
combined_latencies.append(latency)
if errors:
for err, count in errors.items():
combined_errors[err] = combined_errors[err] + count
if ttft:
combined_ttfts.append(ttft)

benchmark_duration = time.time() - benchmark_start_time
print_and_save_result(args, benchmark_duration, len(input_requests), model, combined_latencies, combined_errors)
return combined_latencies, combined_errors
print_and_save_result(args, benchmark_duration, len(input_requests), model, combined_latencies, combined_ttfts, combined_errors)
return combined_latencies, combined_ttfts, combined_errors

def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics, model, errors):
# Setup
Expand Down Expand Up @@ -543,7 +646,7 @@ def get_stats_for_set(name, description, points):
f'p99_{name}': p99,
}

def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, errors):
def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, ttfts, errors):
benchmark_result = {}

print(f"====Result for Model: {model}====")
Expand Down Expand Up @@ -578,7 +681,9 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
print(f"Tokens/min: {tokens_per_min:.2f}")
benchmark_result['total_tokens'] = int(total_tokens)
benchmark_result['tokens_per_min'] = tokens_per_min

ttft_stats = {}
if args.stream_request:
ttft_stats = get_stats_for_set("TTFT", "Time to First Token (s)", ttfts)
if args.machine_cost:
print(
"Cost $/1k tokens:"
Expand All @@ -591,7 +696,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
latency / (prompt_len + output_len)
for prompt_len, output_len, latency in request_latencies
])),

**ttft_stats,
# NOTE: The latency below includes requests awaiting time on server side.
# It's not comparable with the model inference latency for batch size 1.
**(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in request_latencies])),
Expand Down Expand Up @@ -648,6 +753,7 @@ async def main(args: argparse.Namespace):

# Summarize results
combined_latencies = []
combined_ttfts = []
combined_errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
Expand All @@ -656,14 +762,15 @@ async def main(args: argparse.Namespace):
"unknown_error": 0,
"ServerDisconnectedError": 0,
}
for latencies, errors in results:
for latencies, ttfts, errors in results:
combined_latencies.extend(latencies)
combined_ttfts.extend(ttfts)
for k, v in errors.items():
combined_errors[k] = combined_errors[k] + v

benchmark_duration_all_models = time.time() - benchmark_start_time
if args.save_aggregated_result:
print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_errors)
print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_ttfts, combined_errors)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -698,6 +805,11 @@ async def main(args: argparse.Namespace):
type=str,
help="Comma separated list of models to benchmark.",
)
parser.add_argument(
"--stream-request",
action="store_true",
help="Whether to stream the request. Needed for TTFT metric",
)
parser.add_argument(
"--tokenizer",
type=str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do
if [[ "$SAVE_AGGREGATED_RESULT" = "true" ]]; then
PYTHON_OPTS="$PYTHON_OPTS --save-aggregated-result"
fi
if [[ "$STREAM_REQUEST" = "true" ]]; then
PYTHON_OPTS="$PYTHON_OPTS --stream-request"
fi
$PYTHON $PYTHON_OPTS > $output_file
cat $output_file
sleep 5 # wait 5 seconds before next run
Expand Down
1 change: 1 addition & 0 deletions benchmarks/benchmark/tools/profile-generator/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ module "latency-profile" {
file_prefix = var.file_prefix
save_aggregated_result = var.save_aggregated_result
models = var.models
stream_request = var.stream_request
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ resource "kubernetes_manifest" "latency-profile-generator" {
file_prefix = var.file_prefix
save_aggregated_result = var.save_aggregated_result
models = var.models
stream_request = var.stream_request
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
value: ${file_prefix}
- name: SAVE_AGGREGATED_RESULT
value: ${save_aggregated_result}
- name: STREAM_REQUEST
value: ${stream_request}
%{ for hugging_face_token_secret in hugging_face_token_secret_list ~}
- name: HF_TOKEN
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,10 @@ variable "save_aggregated_result" {
description = "Whether to save aggregated result, useful when benchmarking multiple models."
type = bool
default = false
}

variable "stream_request" {
description = "Whether to stream the request. Needed for TTFT metric"
type = bool
default = false
}
6 changes: 6 additions & 0 deletions benchmarks/benchmark/tools/profile-generator/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,10 @@ variable "save_aggregated_result" {
description = "Whether to save aggregated result, useful when benchmarking multiple models."
type = bool
default = false
}

variable "stream_request" {
description = "Whether to stream the request. Needed for TTFT metric"
type = bool
default = false
}

0 comments on commit cc7e216

Please sign in to comment.