diff --git a/engine/clients/pgvector/config.py b/engine/clients/pgvector/config.py index 2d40021..a206864 100644 --- a/engine/clients/pgvector/config.py +++ b/engine/clients/pgvector/config.py @@ -13,9 +13,9 @@ } DISTANCE_MAPPING_CREATE_RUST = { - Distance.L2: "l2_ops", - Distance.COSINE: "cosine_ops", - Distance.DOT: "dot_ops", + Distance.L2: "vector_l2_ops", + Distance.COSINE: "vector_dot_ops", + Distance.DOT: "vector_cos_ops", } DISTANCE_MAPPING_SEARCH = { diff --git a/engine/clients/pgvector/search.py b/engine/clients/pgvector/search.py index d36c769..ee95ffb 100644 --- a/engine/clients/pgvector/search.py +++ b/engine/clients/pgvector/search.py @@ -33,12 +33,8 @@ def search_one(cls, vector: List[float], meta_conditions, top: Optional[int], sc cur.execute("BEGIN;") # set index create parameter for key in cls.search_params["params"].keys(): - if cls.engine_type == "c": - cur.execute(f"SET LOCAL {key} = {cls.search_params['params'][key]};") - else: - # pgvector_rs only support hnsw - cur.execute(f"SET LOCAL vectors.k = {cls.search_params['params']['hnsw.ef_search']};") - break + cur.execute(f"SET LOCAL {key} = {cls.search_params['params'][key]};") + meta_conditions = cls.parser.parse(meta_conditions) if meta_conditions: diff --git a/engine/clients/pgvector/upload.py b/engine/clients/pgvector/upload.py index c7e0b54..5510853 100644 --- a/engine/clients/pgvector/upload.py +++ b/engine/clients/pgvector/upload.py @@ -1,4 +1,5 @@ import time +import toml from typing import List, Optional import psycopg2 from engine.base_client import BaseUploader @@ -13,28 +14,45 @@ class PGVectorUploader(BaseUploader): vector_count: int = None @classmethod - def init_client(cls, host, distance, vector_count, connection_params, upload_params, - extra_columns_name: list, extra_columns_type: list): - database, host, port, user, password = process_connection_params(connection_params, host) - cls.conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port) + def init_client( + cls, + host, + distance, + vector_count, + connection_params, + upload_params, + extra_columns_name: list, + extra_columns_type: list, + ): + database, host, port, user, password = process_connection_params( + connection_params, host + ) + cls.conn = psycopg2.connect( + database=database, user=user, password=password, host=host, port=port + ) cls.host = host cls.upload_params = upload_params cls.engine_type = upload_params.get("engine_type", "c") - cls.distance = DISTANCE_MAPPING_CREATE[distance] if cls.engine_type == "c" else DISTANCE_MAPPING_CREATE_RUST[ - distance] + cls.distance = ( + DISTANCE_MAPPING_CREATE[distance] + if cls.engine_type == "c" + else DISTANCE_MAPPING_CREATE_RUST[distance] + ) cls.vector_count = vector_count @classmethod - def upload_batch(cls, ids: List[int], vectors: List[list], metadata: List[Optional[dict]]): + def upload_batch( + cls, ids: List[int], vectors: List[list], metadata: List[Optional[dict]] + ): if len(ids) != len(vectors): raise RuntimeError("PGVector batch upload unhealthy") # Getting the names of structured data columns based on the first meta information. - col_name_tuple = ('id', 'vector') - col_type_tuple = ('%s', '%s::real[]') + col_name_tuple = ("id", "vector") + col_type_tuple = ("%s", "%s::real[]") if metadata[0] is not None: for col_name in list(metadata[0].keys()): col_name_tuple += (col_name,) - col_type_tuple += ('%s',) + col_type_tuple += ("%s",) insert_data = [] for i in range(0, len(ids)): @@ -43,7 +61,9 @@ def upload_batch(cls, ids: List[int], vectors: List[list], metadata: List[Option for col_name in list(metadata[i].keys()): value = metadata[i][col_name] # Determining if the data is a dictionary type of latitude and longitude. - if isinstance(value, dict) and ('lon' and 'lat') in list(value.keys()): + if isinstance(value, dict) and ("lon" and "lat") in list( + value.keys() + ): raise RuntimeError("Postgres doesn't support geo datasets") else: temp_tuple += (value,) @@ -63,25 +83,22 @@ def upload_batch(cls, ids: List[int], vectors: List[list], metadata: List[Option @classmethod def post_upload(cls, distance): - index_options_c = "" - index_options_rust = "" - for key in cls.upload_params.get("index_params", {}).keys(): - index_options_c += ("{}={}" if index_options_c == "" else ", {}={}").format( - key, cls.upload_params.get('index_params', {})[key]) - index_options_rust += ("{}={}" if index_options_rust == "" else "\n{}={}").format( - key, cls.upload_params.get('index_params', {})[key]) - create_index_command = f"CREATE INDEX ON {PGVECTOR_INDEX} USING hnsw (vector {cls.distance}) WITH ({index_options_c});" - if cls.engine_type == "rust": + if cls.engine_type == "c": + index_options_c = "" + for key in cls.upload_params.get("index_params", {}).keys(): + index_options_c += ( + "{}={}" if index_options_c == "" else ", {}={}" + ).format(key, cls.upload_params.get("index_params", {})[key]) + create_index_command = f"CREATE INDEX ON {PGVECTOR_INDEX} USING hnsw (vector {cls.distance}) WITH ({index_options_c});" + elif cls.engine_type == "rust": + index_options_rust = toml.dumps(cls.upload_params.get("index_params", {})) create_index_command = f""" CREATE INDEX ON {PGVECTOR_INDEX} USING vectors (vector {cls.distance}) WITH (options=$$ -capacity = {int(cls.vector_count*1.2)} -[vectors] -memmap = "ram" -[algorithm.hnsw] -memmap = "ram" {index_options_rust} $$); """ + else: + raise ValueError("PGVector engine type must be c or rust") # create index (blocking) with cls.conn.cursor() as cur: @@ -90,5 +107,7 @@ def post_upload(cls, distance): cls.conn.commit() # wait index finished with cls.conn.cursor() as cur: - cur.execute("SELECT phase, tuples_done, tuples_total FROM pg_stat_progress_create_index;") + cur.execute( + "SELECT phase, tuples_done, tuples_total FROM pg_stat_progress_create_index;" + ) cls.conn.commit() diff --git a/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-ip.json b/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-ip.json index 8f91e8f..34e7b35 100644 --- a/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-ip.json +++ b/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-ip.json @@ -24,28 +24,28 @@ "parallel": 4, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 } }, { "parallel": 4, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 } }, { "parallel": 8, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 } }, { "parallel": 8, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 } } ], @@ -53,8 +53,12 @@ "parallel": 16, "batch_size": 64, "index_params": { - "m": 12, - "ef_construction": 100 + "indexing": { + "hnsw": { + "m": 12, + "ef_construction": 100 + } + } }, "index_type": "hnsw", "engine_type": "rust" diff --git a/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-probability-ip.json b/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-probability-ip.json index af2f8f4..c890020 100644 --- a/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-probability-ip.json +++ b/experiments/needs_editing/pgvector_rust_HNSW_single_node_laion-768-5m-probability-ip.json @@ -24,7 +24,7 @@ "parallel": 4, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.01 @@ -34,7 +34,7 @@ "parallel": 4, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.1 @@ -44,7 +44,7 @@ "parallel": 4, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.01 @@ -54,7 +54,7 @@ "parallel": 4, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.1 @@ -64,7 +64,7 @@ "parallel": 8, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.01 @@ -74,7 +74,7 @@ "parallel": 8, "top": 10, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.1 @@ -84,7 +84,7 @@ "parallel": 8, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.01 @@ -94,7 +94,7 @@ "parallel": 8, "top": 100, "params": { - "hnsw.ef_search": 100 + "vectors.hnsw_ef_search": 100 }, "query_meta": { "probability": 0.1 @@ -105,8 +105,12 @@ "parallel": 16, "batch_size": 64, "index_params": { - "m": 12, - "ef_construction": 100 + "indexing": { + "hnsw": { + "m": 12, + "ef_construction": 100 + } + } }, "index_type": "hnsw", "engine_type": "rust" diff --git a/poetry.lock b/poetry.lock index cb69684..f24c4b4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1875,6 +1875,17 @@ files = [ {file = "stopit-1.1.2.tar.gz", hash = "sha256:f7f39c583fd92027bd9d06127b259aee7a5b7945c1f1fa56263811e1e766996d"}, ] +[[package]] +name = "toml" +version = "0.10.2" +description = "Python Library for Tom's Obvious, Minimal Language" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, + {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, +] + [[package]] name = "tomli" version = "2.0.1" @@ -2194,4 +2205,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.11" -content-hash = "3fc65c00d77b7b37309e8ea64a515e81cb3a455540c12cfce23637b6cae035ff" +content-hash = "6fe42fa0b777a3113768142ec92cd1f81f056e0d64fcfadcfa34b2d7433a48ab" diff --git a/pyproject.toml b/pyproject.toml index 1caef3b..b7657f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ pinecone-client = "^2.2.4" pyproximase = "^0.7.0" boto3 = "^1.34.15" psycopg2-binary = "^2.9.9" +toml = "^0.10.2" [tool.poetry.dev-dependencies]