Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

html report wip #7

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ At high traffic levels, ingestion and query performance may be limited in a sing
to larger aggregate numbers by using multiple processes or even distributing across multiple VMs. In this case, the result metrics will need to be aggregated
to get the total QPS and throughput.

This can be supported by the --concurrency (-c) parameter which will spawn separate child processes:

i.e. Run 4 query benchmarks in separate processes with 4 tables each with table name prefix 'my-prefix' (bash):
```bash
./multi.sh 4 my-prefix "uv run bench.py -t 4 -q 10000 --no-ingest --no-index"
```

`uv run bench.py -t 4 -q 10000 -c 4 -p my-prefix --no-ingest --no-index`


This technique can also be used for high-throughput ingestion to multiple tables in parallel using a table prefix per process:
i.e. Ingest to 50 tables in parallel across 10 processes with prefix 'high-throughput' (bash):
```bash
./multi.sh 10 high-throughput "uv run bench.py -t 5"
```

`uv run bench.py -c 10 -t 5 -p high-throughput`


### Reporting

# todo
202 changes: 147 additions & 55 deletions bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import concurrent
import os
import time
import traceback
from concurrent.futures import wait
from multiprocessing import Pool
from pprint import pprint
from typing import Iterable

from lancedb.remote.errors import LanceDBClientError
Expand All @@ -13,44 +16,71 @@
import pyarrow as pa
from datasets import load_dataset, DownloadConfig

from src.cloud.benchmark.util import print_percentiles, await_indices
from cloud.benchmark.report import (
set_result,
save_results,
get_report_dir,
generate_report,
aggregate_results,
)
from src.cloud.benchmark.util import print_percentiles, await_indices, get_percentiles


def run_benchmark(
dataset: str,
num_tables: int,
batch_size: int,
num_queries: int,
ingest: bool,
index: bool,
prefix: str,
reset: bool
dataset: str,
num_tables: int,
batch_size: int,
num_queries: int,
ingest: bool,
index: bool,
prefix: str,
reset: bool,
test_run_id: str,
):
db = lancedb.connect(
uri=os.environ["LANCEDB_DB_URI"],
api_key=os.environ["LANCEDB_API_KEY"],
host_override=os.getenv("LANCEDB_HOST_OVERRIDE"),
region=os.getenv("LANCEDB_REGION", "us-east-1"),
)
try:
print(f"starting test run {test_run_id} with prefix {prefix}")
db = lancedb.connect(
uri=os.environ["LANCEDB_DB_URI"],
api_key=os.environ["LANCEDB_API_KEY"],
host_override=os.getenv("LANCEDB_HOST_OVERRIDE"),
region=os.getenv("LANCEDB_REGION", "us-east-1"),
)
results = {
"params": {
"test_run_id": test_run_id,
"dataset": dataset,
"batch_size": batch_size,
"num_queries": num_queries,
"prefix": prefix,
},
"tables": {},
}

if reset:
_drop_tables(db, num_tables, prefix)

if ingest:
tables = list(_create_tables(db, num_tables, prefix))
_ingest(tables, dataset, batch_size, results)
else:
tables = list(_open_tables(db, num_tables, prefix))

if reset:
_drop_tables(db, num_tables, prefix)
if index:
_create_indices(tables, results)

if ingest:
tables = list(_create_tables(db, num_tables, prefix))
_ingest(tables, dataset, batch_size)
else:
tables = list(_open_tables(db, num_tables, prefix))
_query_tables(tables, num_queries, results)

if index:
_create_indices(tables)
# pprint(results)
save_results(test_run_id, prefix, results)

_query_tables(tables, num_queries)
print("benchmark complete")
except Exception as e:
print(f"benchmark failed with error: {e}")
print(traceback.format_exc())
raise


def _create_tables(
db: lancedb.LanceDBConnection, num_tables: int, prefix: str
db: lancedb.LanceDBConnection, num_tables: int, prefix: str
) -> Iterable[RemoteTable]:
schema = pa.schema(
[
Expand All @@ -77,7 +107,7 @@ def _create_tables(


def _open_tables(
db: lancedb.LanceDBConnection, num_tables: int, prefix: str
db: lancedb.LanceDBConnection, num_tables: int, prefix: str
) -> Iterable[RemoteTable]:
for i in range(num_tables):
table_name = f"{prefix}-{i}"
Expand All @@ -91,24 +121,32 @@ def _drop_tables(db, num_tables, prefix):
db.drop_table(t.name)


def _ingest(tables: list[RemoteTable], dataset: str, batch_size: int):
def _ingest(tables: list[RemoteTable], dataset: str, batch_size: int, results: dict):
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=len(tables)) as executor:
futures = []

for table in tables:
futures.append(executor.submit(_ingest_table, dataset, table, batch_size))
results = [future.result() for future in futures]
futures.append(
executor.submit(_ingest_table, dataset, table, batch_size, results)
)
r = [future.result() for future in futures]

total_s = time.time() - start
total_rows = sum(results)
total_rows = sum(r)
rows_s = total_rows / total_s
print(
f"ingested {total_rows} rows in {len(tables)} tables in {total_s:.1f}s. average: {total_rows / total_s:.1f}rows/s"
f"ingested {total_rows} rows in {len(tables)} tables in {total_s:.1f}s. average: {rows_s :.1f}rows/s"
)

set_result(results, "ingest_time_all_tables", total_s)
set_result(results, table.name, "ingest_total_rows_all_tables", total_rows)
set_result(results, table.name, "ingest_avg_rows_s_all_tables", rows_s)

def _ingest_table(dataset: str, table: RemoteTable, batch_size: int) -> int:
# todo: support batch size > 1000

def _ingest_table(
dataset: str, table: RemoteTable, batch_size: int, results: dict
) -> int:
add_times = []
begin = time.time()
total_rows = 0
Expand All @@ -124,10 +162,18 @@ def _ingest_table(dataset: str, table: RemoteTable, batch_size: int) -> int:
)

total_s = int((time.time() - begin))
rows_s = total_rows / total_s
print(
f"{table.name}: ingested {total_rows} rows in {total_s}s. average: {total_rows / total_s:.1f}rows/s"
f"{table.name}: ingested {total_rows} rows in {total_s}s. average: {rows_s :.1f}rows/s"
)
print_percentiles(add_times)
add_percentiles = get_percentiles(add_times)
print_percentiles(add_percentiles)

set_result(results, table.name, "ingest_time_s", total_s)
set_result(results, table.name, "rows_ingested", total_rows)
set_result(results, table.name, "ingest_avg_rows_s", rows_s)
set_result(results, table.name, "ingest_percentiles_ms", add_percentiles)

return total_rows


Expand All @@ -144,20 +190,22 @@ def _split_record_batch(record_batch, batch_size):
yield record_batch.slice(i, min(batch_size, num_rows - i))


def _query_tables(tables: list[RemoteTable], num_queries: int):
def _query_tables(tables: list[RemoteTable], num_queries: int, results: dict):
num_tables = len(tables)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor:
futures = []

for table in tables:
futures.append(executor.submit(_query_table, table, num_queries))
results = [future.result() for future in futures]
futures.append(executor.submit(_query_table, table, num_queries, results))
r = [future.result() for future in futures]

total_queries = num_queries * num_tables
total_qps = sum(results)
total_qps = sum(r)
print(
f"completed {total_queries} queries on {num_tables} tables. average: {total_qps:.1f}QPS"
)
set_result(results, None, "total_queries", total_queries)
set_result(results, None, "total_qps", total_qps)


def _await_index(table: RemoteTable, index_type: str, start_time):
Expand All @@ -167,7 +215,7 @@ def _await_index(table: RemoteTable, index_type: str, start_time):
)


def _create_indices(tables: list[RemoteTable]):
def _create_indices(tables: list[RemoteTable], results):
# create the indices - these will be created async
table_indices = {}
for t in tables:
Expand Down Expand Up @@ -231,7 +279,7 @@ def _convert_dataset(schema, dataset: str, batch_size: int) -> Iterable[pa.Recor
yield b


def _query_table(table: RemoteTable, num_queries: int, warmup_queries=100):
def _query_table(table: RemoteTable, num_queries: int, results: dict, warmup_queries=1):
# log a warning if data is not fully indexed
total_rows = table.count_rows()
for idx in table.list_indices()["indexes"]:
Expand All @@ -258,13 +306,22 @@ def _query_table(table: RemoteTable, num_queries: int, warmup_queries=100):
total_s = int(time.time() - begin)
qps = num_queries / total_s
print(f"{table.name}: query count: {num_queries} average: {qps :.1f}QPS")
print_percentiles(diffs)
percentiles = get_percentiles(diffs)
print_percentiles(percentiles)

set_result(results, table.name, "query_total_time_s", total_s)
set_result(results, table.name, "num_queries", num_queries)
set_result(results, table.name, "query_avg_qps", qps)
set_result(results, table.name, "query_percentiles_ms", percentiles)

return qps


def _query(table: RemoteTable, nprobes=1):
try:
table.search(np.random.standard_normal(1536)).metric("cosine").nprobes(nprobes).select(["openai", "title"]).to_list()
table.search(np.random.standard_normal(1536)).metric("cosine").nprobes(
nprobes
).select(["openai", "title"]).to_list()
except Exception as e:
print(f"{table.name}: error during query: {e}")

Expand Down Expand Up @@ -320,6 +377,13 @@ def main():
default="ldb-cloud-benchmarks",
help="table name prefix",
)
parser.add_argument(
"-c",
"--concurrency",
type=int,
default=1,
help="number of parallel processes to launch",
)
parser.add_argument(
"-r",
"--reset",
Expand All @@ -328,18 +392,46 @@ def main():
action=argparse.BooleanOptionalAction,
help="drop tables before ingesting",
)
parser.add_argument(
"-i",
"--id",
type=str,
default=None,
help="drop tables before ingesting",
)
args = parser.parse_args()
print(args)
run_benchmark(
args.dataset,
args.tables,
args.batch,
args.queries,
args.ingest,
args.index,
args.prefix,
args.reset
)

test_run_id = args.id
if not test_run_id:
test_run_id = str(int(time.time()))

# launch child processes based on configured concurrency
# each child process will operate on unique tables based on table name prefix
with Pool(processes=args.concurrency) as pool:
p_args = [
(
args.dataset,
args.tables,
args.batch,
args.queries,
args.ingest,
args.index,
f"{args.prefix}-{i+1}" if args.concurrency > 1 else args.prefix,
args.reset,
test_run_id,
)
for i in range(args.concurrency)
]
pool.starmap(run_benchmark, p_args)

# aggregate results from concurrent benchmarks
aggregate_results(test_run_id)

# generate the report
report_file = generate_report(get_report_dir(test_run_id))
print(f"finished test run {test_run_id}. ")
print(f"report saved to {report_file}")


if __name__ == "__main__":
Expand Down
9 changes: 0 additions & 9 deletions multi.sh

This file was deleted.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ dependencies = [
"lancedb",
"tqdm",
"pyarrow",
"backoff"
"backoff",
"jinja2"
]

[tool.uv]
Expand Down
Loading