Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: nprob and parallel sample #46

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,6 @@ PGPASSWORD=123 psql -h 127.0.0.1 -U postgres -c "CREATE USER bench WITH PASSWORD
PGPASSWORD=123 psql -h 127.0.0.1 -U postgres -c "ALTER ROLE bench SUPERUSER;"
```

## Download Data

```shell
export AWS_ACCESS_KEY_ID=xxx
export AWS_SECRET_ACCESS_KEY=xxx

aws s3 cp s3://pgvecto.rs-bench/sift_128_1m/sift.hdf5 $(whoami)/sift/sift.hdf5
aws s3 cp s3://pgvecto.rs-bench/gist_960_1m/gist.hdf5 $(whoami)/gist/gist.hdf5
aws s3 cp s3://pgvecto.rs-bench/glove_200_1m/glove.hdf5 $(whoami)/glove/glove.hdf5
aws s3 cp s3://pgvecto.rs-bench/openai_1536_500k/openai.hdf5 $(whoami)/openai/openai.hdf5
aws s3 cp s3://pgvecto.rs-bench/cohere-768-1m-2022/cohere.hdf5 $(whoami)/cohere_1m_22/cohere_1m_22.hdf5
aws s3 cp s3://pgvecto.rs-bench/cohere-1024-1m-2023/cohere-1m-23.hdf5 $(whoami)/cohere_1m_23/cohere_1m_23.hdf5
aws s3 cp s3://pgvecto.rs-bench/cohere-1024-10m-2023/cohere-10m-23.hdf5 $(whoami)/cohere_10m_23/cohere_10m_23.hdf5
```

## Run Bench

Options for `-n`:
Expand All @@ -56,15 +41,21 @@ Options for `-n`:
```shell
# pip install pgvector numpy faiss-cpu psycopg h5py tqdm

# If using GPU for train.py:
# conda install pytorch::faiss-gpu

# dump table embedding column to a local h5 file["train"]
python dump.py -n sift -o sift.h5 -c embedding -d 128

# external k-means
python train.py -i sift.hdf5 -o sift_centroids_4096
python train.py -i sift.hdf5 -o sift_centroids_4096 -m l2

# build index (w/wo external centroids)
python index.py -n sift -c sift_centroids_4096.npy -i sift.hdf5
## with external centroids
python index.py -n sift -c sift_centroids_4096.npy -i sift.hdf5 -m l2 -p 123 -k 4096 -d 768 -w 4
## without external centroids
## python index.py -n sift -i sift.hdf5 -m l2 -p 123 -k 4096 -d 768 -w 4

# bench
python bench.py -n sift -i sift.hdf5
python bench.py -n sift -i sift.hdf5 --nprob 100
```
5 changes: 4 additions & 1 deletion bench/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def build_arg_parse():
parser.add_argument(
"-p", "--password", help="Database password", default="password"
)
parser.add_argument(
"--nprob", help="argument rabbithole.nprobe for query", default=300, type=int
)
return parser


Expand Down Expand Up @@ -84,6 +87,6 @@ def bench(name, test, answer, metric_ops, conn):
else:
raise ValueError
conn = create_connection(args.password)
conn.execute("SET rabbithole.nprobe=300")
conn.execute(f"SET rabbithole.nprobe={args.nprob}")

bench(args.name, test, answer, metric_ops, conn)
2 changes: 1 addition & 1 deletion bench/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def monitor_index_build(conn, finish: asyncio.Event):

async def main(dataset):
dataset = h5py.File(Path(args.input), "r")
url = f"postgresql://postgres:{args.password}@localhost:5432/postgres",
url = f"postgresql://postgres:{args.password}@localhost:5432/postgres"
conn = await create_connection(url)
if args.centroids:
centroids = np.load(args.centroids, allow_pickle=False)
Expand Down
90 changes: 85 additions & 5 deletions bench/train.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import itertools
from multiprocessing import Pool, cpu_count
from time import perf_counter
import argparse
from pathlib import Path
from sys import version_info
from tqdm import tqdm

if version_info >= (3, 12):
raise RuntimeError("h5py doesn't support 3.12")
Expand All @@ -13,6 +16,7 @@

DEFAULT_K = 4096
N_ITER = 25
CHUNKS = 10
SEED = 42
MAX_POINTS_PER_CLUSTER = 256

Expand All @@ -35,34 +39,102 @@ def build_arg_parse():
parser.add_argument(
"-g", "--gpu", help="enable GPU for KMeans", action="store_true"
)
parser.add_argument(
"--in-memory",
help="use numpy in-memory mode sampling, quicker for large dataset",
action="store_true",
)
parser.add_argument(
"--chunks",
help="chunks for in-memory mode. If OOM, increase it",
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
type=int,
default=CHUNKS,
)
return parser


def reservoir_sampling(iterator, k: int):
"""Reservoir sampling from an iterator."""
res = []
while len(res) < k:
for _ in tqdm(range(k), desc="Collect train subset"):
try:
res.append(next(iterator))
except StopIteration:
return np.vstack(res)
for i, vec in enumerate(iterator, k + 1):
for i, vec in tqdm(enumerate(iterator, k + 1), total=k, desc="Random Pick"):
j = np.random.randint(0, i)
if j < k:
res[j] = vec
return np.vstack(res)


def _slice_chunk(args: tuple[int, str, np.ndarray]):
k, file_path, chunk, start_idx = args
dataset = h5py.File(Path(file_path), "r")
data = dataset["train"]
start, end = min(chunk), max(chunk)
indexes = [c - start for c in chunk]
source = data[start : end + 1]
select = source[indexes]
delta, dim = select.shape

output = np.memmap("index.mmap", dtype=np.float32, mode="r+", shape=(k, dim))
output[start_idx : start_idx + delta, :] = select
output.flush()


def reservoir_sampling_np(data, file_path, k: int, chunks: int):
"""Reservoir sampling in memory by numpy."""
index = np.random.permutation(len(data))[:k]
indices = np.sort(index)
num_processes = cpu_count()
# Split indices into chunks for parallel processing
index_chunks = np.array_split(indices, chunks)
_, dim = data.shape
np.memmap("index.mmap", dtype=np.float32, mode="w+", shape=(k, dim))
# Create arguments for each chunk
start_idx_acu = [0]
start_idx_acu.extend(
list(itertools.accumulate([len(c) for c in index_chunks[:-1]]))
)
chunk_args = [
(k, file_path, chunk, start_idx_acu[i]) for i, chunk in enumerate(index_chunks)
]
# Process chunks in parallel
with Pool(processes=num_processes) as pool:
list(pool.map(_slice_chunk, chunk_args))


def filter_by_label(iter, labels, target):
for i, vec in enumerate(iter):
if labels[i] == target:
yield vec


def kmeans_cluster(data, k, child_k, niter, metric, gpu=False):
def kmeans_cluster(
data,
file_path,
k,
child_k,
niter,
metric,
gpu=False,
in_memory=False,
chunks=CHUNKS,
):
n, dim = data.shape
if n > MAX_POINTS_PER_CLUSTER * k:
if n > MAX_POINTS_PER_CLUSTER * k and not in_memory:
train = reservoir_sampling(iter(data), MAX_POINTS_PER_CLUSTER * args.k)
elif n > MAX_POINTS_PER_CLUSTER * k and in_memory:
reservoir_sampling_np(data, file_path, MAX_POINTS_PER_CLUSTER * args.k, chunks)
train = np.array(
np.memmap(
"index.mmap",
dtype=np.float32,
mode="r",
shape=(MAX_POINTS_PER_CLUSTER * k, dim),
)
)
else:
train = data[:]
kmeans = Kmeans(
Expand Down Expand Up @@ -109,7 +181,15 @@ def kmeans_cluster(data, k, child_k, niter, metric, gpu=False):

start_time = perf_counter()
centroids = kmeans_cluster(
dataset["train"], args.k, args.child_k, args.niter, args.metric, args.gpu
dataset["train"],
args.input,
args.k,
args.child_k,
args.niter,
args.metric,
args.gpu,
args.in_memory,
args.chunks,
)
print(f"K-means (k=({args.k}, {args.child_k})): {perf_counter() - start_time:.2f}s")

Expand Down