From f5476fdbae92ec870fe292692be345c60ce09e0b Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Mon, 4 Nov 2024 14:41:45 +0800 Subject: [PATCH 1/2] feat: nprob and parallel sample Signed-off-by: cutecutecat --- bench/README.md | 27 +++++---------- bench/bench.py | 5 ++- bench/index.py | 2 +- bench/train.py | 90 ++++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 99 insertions(+), 25 deletions(-) diff --git a/bench/README.md b/bench/README.md index c0ae63c..4f3643c 100644 --- a/bench/README.md +++ b/bench/README.md @@ -27,21 +27,6 @@ PGPASSWORD=123 psql -h 127.0.0.1 -U postgres -c "CREATE USER bench WITH PASSWORD PGPASSWORD=123 psql -h 127.0.0.1 -U postgres -c "ALTER ROLE bench SUPERUSER;" ``` -## Download Data - -```shell -export AWS_ACCESS_KEY_ID=xxx -export AWS_SECRET_ACCESS_KEY=xxx - -aws s3 cp s3://pgvecto.rs-bench/sift_128_1m/sift.hdf5 $(whoami)/sift/sift.hdf5 -aws s3 cp s3://pgvecto.rs-bench/gist_960_1m/gist.hdf5 $(whoami)/gist/gist.hdf5 -aws s3 cp s3://pgvecto.rs-bench/glove_200_1m/glove.hdf5 $(whoami)/glove/glove.hdf5 -aws s3 cp s3://pgvecto.rs-bench/openai_1536_500k/openai.hdf5 $(whoami)/openai/openai.hdf5 -aws s3 cp s3://pgvecto.rs-bench/cohere-768-1m-2022/cohere.hdf5 $(whoami)/cohere_1m_22/cohere_1m_22.hdf5 -aws s3 cp s3://pgvecto.rs-bench/cohere-1024-1m-2023/cohere-1m-23.hdf5 $(whoami)/cohere_1m_23/cohere_1m_23.hdf5 -aws s3 cp s3://pgvecto.rs-bench/cohere-1024-10m-2023/cohere-10m-23.hdf5 $(whoami)/cohere_10m_23/cohere_10m_23.hdf5 -``` - ## Run Bench Options for `-n`: @@ -56,15 +41,21 @@ Options for `-n`: ```shell # pip install pgvector numpy faiss-cpu psycopg h5py tqdm +# If using GPU for train.py: +# conda install pytorch::faiss-gpu + # dump table embedding column to a local h5 file["train"] python dump.py -n sift -o sift.h5 -c embedding -d 128 # external k-means -python train.py -i sift.hdf5 -o sift_centroids_4096 +python train.py -i sift.hdf5 -o sift_centroids_4096 -m l2 # build index (w/wo external centroids) -python index.py -n sift -c sift_centroids_4096.npy -i sift.hdf5 +## with external centroids +python index.py -n sift -c sift_centroids_4096.npy -i sift.hdf5 -m l2 -p 123 -k 4096 -d 768 -w 4 +## without external centroids +## python index.py -n sift -i sift.hdf5 -m l2 -p 123 -k 4096 -d 768 -w 4 # bench -python bench.py -n sift -i sift.hdf5 +python bench.py -n sift -i sift.hdf5 --nprob 100 ``` diff --git a/bench/bench.py b/bench/bench.py index 26423b5..ced1c53 100644 --- a/bench/bench.py +++ b/bench/bench.py @@ -22,6 +22,9 @@ def build_arg_parse(): parser.add_argument( "-p", "--password", help="Database password", default="password" ) + parser.add_argument( + "--nprob", help="argument rabbithole.nprobe for query", default=300, type=int + ) return parser @@ -84,6 +87,6 @@ def bench(name, test, answer, metric_ops, conn): else: raise ValueError conn = create_connection(args.password) - conn.execute("SET rabbithole.nprobe=300") + conn.execute(f"SET rabbithole.nprobe={args.nprob}") bench(args.name, test, answer, metric_ops, conn) diff --git a/bench/index.py b/bench/index.py index 97b67a0..bb08796 100644 --- a/bench/index.py +++ b/bench/index.py @@ -168,7 +168,7 @@ async def monitor_index_build(conn, finish: asyncio.Event): async def main(dataset): dataset = h5py.File(Path(args.input), "r") - url = f"postgresql://postgres:{args.password}@localhost:5432/postgres", + url = f"postgresql://postgres:{args.password}@localhost:5432/postgres" conn = await create_connection(url) if args.centroids: centroids = np.load(args.centroids, allow_pickle=False) diff --git a/bench/train.py b/bench/train.py index 4333992..b07f5e5 100644 --- a/bench/train.py +++ b/bench/train.py @@ -1,7 +1,10 @@ +import itertools +from multiprocessing import Pool, cpu_count from time import perf_counter import argparse from pathlib import Path from sys import version_info +from tqdm import tqdm if version_info >= (3, 12): raise RuntimeError("h5py doesn't support 3.12") @@ -13,6 +16,7 @@ DEFAULT_K = 4096 N_ITER = 25 +CHUNKS = 10 SEED = 42 MAX_POINTS_PER_CLUSTER = 256 @@ -35,34 +39,102 @@ def build_arg_parse(): parser.add_argument( "-g", "--gpu", help="enable GPU for KMeans", action="store_true" ) + parser.add_argument( + "--in-memory", + help="use numpy in-memory mode sampling, quicker for large dataset", + action="store_true", + ) + parser.add_argument( + "--chunks", + help="chunks for in-memory mode. If OOM, increase it", + type=int, + default=CHUNKS, + ) return parser def reservoir_sampling(iterator, k: int): """Reservoir sampling from an iterator.""" res = [] - while len(res) < k: + for _ in tqdm(range(k), desc="Collect train subset"): try: res.append(next(iterator)) except StopIteration: return np.vstack(res) - for i, vec in enumerate(iterator, k + 1): + for i, vec in tqdm(enumerate(iterator, k + 1), total=k, desc="Random Pick"): j = np.random.randint(0, i) if j < k: res[j] = vec return np.vstack(res) +def _slice_chunk(args: tuple[int, str, np.ndarray]): + k, file_path, chunk, start_idx = args + dataset = h5py.File(Path(file_path), "r") + data = dataset["train"] + start, end = min(chunk), max(chunk) + indexes = [c - start for c in chunk] + source = data[start : end + 1] + select = source[indexes] + delta, dim = select.shape + + output = np.memmap("index.mmap", dtype=np.float32, mode="r+", shape=(k, dim)) + output[start_idx : start_idx + delta, :] = select + output.flush() + + +def reservoir_sampling_np(data, file_path, k: int, chunks: int): + """Reservoir sampling in memory by numpy.""" + index = np.random.permutation(len(data))[:k] + indices = np.sort(index) + num_processes = cpu_count() + # Split indices into chunks for parallel processing + index_chunks = np.array_split(indices, chunks) + _, dim = data.shape + np.memmap("index.mmap", dtype=np.float32, mode="w+", shape=(k, dim)) + # Create arguments for each chunk + start_idx_acu = [0] + start_idx_acu.extend( + list(itertools.accumulate([len(c) for c in index_chunks[:-1]])) + ) + chunk_args = [ + (k, file_path, chunk, start_idx_acu[i]) for i, chunk in enumerate(index_chunks) + ] + # Process chunks in parallel + with Pool(processes=num_processes) as pool: + list(pool.map(_slice_chunk, chunk_args)) + + def filter_by_label(iter, labels, target): for i, vec in enumerate(iter): if labels[i] == target: yield vec -def kmeans_cluster(data, k, child_k, niter, metric, gpu=False): +def kmeans_cluster( + data, + file_path, + k, + child_k, + niter, + metric, + gpu=False, + in_memory=False, + chunks=CHUNKS, +): n, dim = data.shape - if n > MAX_POINTS_PER_CLUSTER * k: + if n > MAX_POINTS_PER_CLUSTER * k and not in_memory: train = reservoir_sampling(iter(data), MAX_POINTS_PER_CLUSTER * args.k) + elif n > MAX_POINTS_PER_CLUSTER * k and in_memory: + reservoir_sampling_np(data, file_path, MAX_POINTS_PER_CLUSTER * args.k, chunks) + train = np.array( + np.memmap( + "index.mmap", + dtype=np.float32, + mode="r", + shape=(MAX_POINTS_PER_CLUSTER * k, dim), + ) + ) else: train = data[:] kmeans = Kmeans( @@ -109,7 +181,15 @@ def kmeans_cluster(data, k, child_k, niter, metric, gpu=False): start_time = perf_counter() centroids = kmeans_cluster( - dataset["train"], args.k, args.child_k, args.niter, args.metric, args.gpu + dataset["train"], + args.input, + args.k, + args.child_k, + args.niter, + args.metric, + args.gpu, + args.in_memory, + args.chunks, ) print(f"K-means (k=({args.k}, {args.child_k})): {perf_counter() - start_time:.2f}s") From 5f13dcf3950a4a90946aa98bd50e53f921737f2a Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Fri, 8 Nov 2024 08:42:46 +0800 Subject: [PATCH 2/2] chunk insert at index and norm at train Signed-off-by: cutecutecat --- bench/index.py | 40 ++++++++++++++++++++++++++++------------ bench/train.py | 7 +++++-- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/bench/index.py b/bench/index.py index bb08796..963e209 100644 --- a/bench/index.py +++ b/bench/index.py @@ -1,4 +1,5 @@ import asyncio +import math from time import perf_counter import argparse from pathlib import Path @@ -16,6 +17,7 @@ "keepalives_interval": 5, "keepalives_count": 5, } +CHUNKS = 10 def build_arg_parse(): @@ -45,6 +47,12 @@ def build_arg_parse(): required=False, default=max(multiprocessing.cpu_count() - 1, 1), ) + parser.add_argument( + "--chunks", + help="chunks for in-memory mode. If OOM, increase it", + type=int, + default=CHUNKS, + ) return parser @@ -113,21 +121,29 @@ async def add_centroids(conn, name, centroids): await asyncio.sleep(0) -async def add_embeddings(conn, name, dim, train): +async def add_embeddings(conn, name, dim, train, chunks): await conn.execute(f"DROP TABLE IF EXISTS {name}") await conn.execute(f"CREATE TABLE {name} (id integer, embedding vector({dim}))") - async with conn.cursor().copy( - f"COPY {name} (id, embedding) FROM STDIN WITH (FORMAT BINARY)" - ) as copy: - copy.set_types(["integer", "vector"]) + n, dim = train.shape + chunk_size = math.ceil(n / chunks) + pbar = tqdm(desc="Adding embeddings", total=n) + for i in range(chunks): + chunk_start = i * chunk_size + chunk_len = min(chunk_size, n - i * chunk_size) + data = train[chunk_start : chunk_start + chunk_len] - for i, vec in tqdm( - enumerate(train), desc="Adding embeddings", total=len(train) - ): - await copy.write_row((i, vec)) - while conn.pgconn.flush() == 1: - await asyncio.sleep(0) + async with conn.cursor().copy( + f"COPY {name} (id, embedding) FROM STDIN WITH (FORMAT BINARY)" + ) as copy: + copy.set_types(["integer", "vector"]) + + for i, vec in enumerate(data): + await copy.write_row((chunk_start + i, vec)) + while conn.pgconn.flush() == 1: + await asyncio.sleep(0) + pbar.update(chunk_len) + pbar.close() async def build_index( @@ -176,7 +192,7 @@ async def main(dataset): metric_ops, ivf_config = get_ivf_ops_config( args.metric, args.k, args.name if args.centroids else None ) - await add_embeddings(conn, args.name, args.dim, dataset["train"]) + await add_embeddings(conn, args.name, args.dim, dataset["train"], args.chunks) index_finish = asyncio.Event() # Need a separate connection for monitor process diff --git a/bench/train.py b/bench/train.py index b07f5e5..2640300 100644 --- a/bench/train.py +++ b/bench/train.py @@ -5,6 +5,7 @@ from pathlib import Path from sys import version_info from tqdm import tqdm +from numpy import linalg as LA if version_info >= (3, 12): raise RuntimeError("h5py doesn't support 3.12") @@ -35,7 +36,7 @@ def build_arg_parse(): parser.add_argument( "--niter", help="number of iterations", type=int, default=N_ITER ) - parser.add_argument("-m", "--metric", choices=["l2", "cos"], default="l2") + parser.add_argument("-m", "--metric", choices=["l2", "cos", "dot"], default="l2") parser.add_argument( "-g", "--gpu", help="enable GPU for KMeans", action="store_true" ) @@ -137,8 +138,10 @@ def kmeans_cluster( ) else: train = data[:] + if metric == "cos": + train = train / LA.norm(train, axis=1, keepdims=True) kmeans = Kmeans( - dim, k, gpu=gpu, verbose=True, niter=niter, seed=SEED, spherical=metric == "cos" + dim, k, gpu=gpu, verbose=True, niter=niter, seed=SEED, spherical=metric != "l2" ) kmeans.train(train) if not child_k: