diff --git a/java/core/lance-jni/src/utils.rs b/java/core/lance-jni/src/utils.rs index 35dc58a440..41cfb87fe2 100644 --- a/java/core/lance-jni/src/utils.rs +++ b/java/core/lance-jni/src/utils.rs @@ -265,6 +265,7 @@ pub fn get_index_params( Some(VectorIndexParams { metric_type: distance_type, stages, + force_use_new_index_format: None, }) } else { None diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index f450992b0c..d058201f02 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1449,6 +1449,7 @@ def create_index( storage_options: Optional[Dict[str, str]] = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, + use_new_vector_index_format: Optional[bool] = None, **kwargs, ) -> LanceDataset: """Create index on column. @@ -1511,6 +1512,9 @@ def create_index( for nullable columns. Obtains a small speed boost. one_pass_ivfpq: bool Defaults to False. If enabled, index type must be "IVF_PQ". Reduces disk IO. + use_new_vector_index_format : bool, optional + Allows to use the new V3 index format. Default is False. This is mostly for + IVF_PQ indices which still defaults to use the legacy format. kwargs : Parameters passed to the index building process. @@ -1900,6 +1904,9 @@ def create_index( ) kwargs["pq_codebook"] = pq_codebook_batch + if index_type == "IVF_PQ" and use_new_vector_index_format is not None: + kwargs["use_new_vector_index_format"] = use_new_vector_index_format + if shuffle_partition_batches is not None: kwargs["shuffle_partition_batches"] = shuffle_partition_batches if shuffle_partition_concurrency is not None: diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index ac49b6f90f..b1415d01b4 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -30,6 +30,7 @@ from lance.commit import CommitConflictError from lance.debug import format_fragment from lance.util import validate_vector_index +from lance.file import LanceFileReader # Various valid inputs for write_dataset input_schema = pa.schema([pa.field("a", pa.float64()), pa.field("b", pa.int64())]) @@ -2358,6 +2359,12 @@ def test_count_index_rows(tmp_path: Path): assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + indices = dataset.list_indices() + index_uuid = indices[0]["uuid"] + index_dir = base_dir / f"_indices/{index_uuid}" + index_path = os.listdir(index_dir) + assert len(index_path) == 1 + def test_dataset_progress(tmp_path: Path): data = pa.table({"a": range(10)}) @@ -2650,3 +2657,53 @@ def test_default_storage_version(tmp_path: Path): sample_file = frag.to_json()["files"][0] assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION + + +def test_ivf_index_creation_with_v3_version(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + # assert we return None for index name that doesn't exist + index_name = "a_idx" + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_unindexed_rows"] + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_indexed_rows"] + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + name=index_name, + num_partitions=2, + num_sub_vectors=1, + use_new_vector_index_format=True, + ) + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + # append some data + new_table = pa.Table.from_pydict( + {"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema + ) + dataset = lance.write_dataset(new_table, base_dir, mode="append") + + # assert rows added since index was created are uncounted + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + indices = dataset.list_indices() + index_uuid = indices[0]["uuid"] + storage_path = base_dir / f"_indices/{index_uuid}/auxiliary.idx" + reader = LanceFileReader(str(storage_path)) + metadata = reader.metadata() + assert metadata.major_version == 0 + assert metadata.minor_version == 3 diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 31bbf3d544..7181ea7469 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1564,6 +1564,7 @@ fn prepare_vector_index_params( let mut hnsw_params = HnswBuildParams::default(); let mut pq_params = PQBuildParams::default(); let mut sq_params = SQBuildParams::default(); + let mut use_new_vector_index_format = false; if let Some(kwargs) = kwargs { // Parse metric type @@ -1623,6 +1624,10 @@ fn prepare_vector_index_params( ivf_params.storage_options = Some(storage_options); } + if let Some(use_new_format) = kwargs.get_item("use_new_vector_index_format")? { + use_new_vector_index_format = use_new_format.extract::()?; + }; + match ( kwargs.get_item("precomputed_shuffle_buffers")?, kwargs.get_item("precomputed_shuffle_buffers_path")? @@ -1683,9 +1688,17 @@ fn prepare_vector_index_params( } match index_type { - "IVF_PQ" => Ok(Box::new(VectorIndexParams::with_ivf_pq_params( - m_type, ivf_params, pq_params, - ))), + "IVF_PQ" => { + if use_new_vector_index_format { + Ok(Box::new(VectorIndexParams::with_ivf_pq_params_v3( + m_type, ivf_params, pq_params, + ))) + } else { + Ok(Box::new(VectorIndexParams::with_ivf_pq_params( + m_type, ivf_params, pq_params, + ))) + } + } "IVF_HNSW_PQ" => Ok(Box::new(VectorIndexParams::with_ivf_hnsw_pq_params( m_type, diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index ccde0156a1..833f8f9263 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -65,6 +65,10 @@ pub struct VectorIndexParams { /// Vector distance metrics type. pub metric_type: MetricType, + + /// Use V3 Index builder. + /// Only used by IVF_PQ index since IVF_PQ still creates old index format by default. + pub force_use_new_index_format: Option, } impl VectorIndexParams { @@ -74,6 +78,7 @@ impl VectorIndexParams { Self { stages, metric_type, + force_use_new_index_format: None, } } @@ -106,6 +111,7 @@ impl VectorIndexParams { Self { stages, metric_type, + force_use_new_index_format: None, } } @@ -119,9 +125,21 @@ impl VectorIndexParams { Self { stages, metric_type, + force_use_new_index_format: None, } } + /// Create index parameters with `IVF` and `PQ` parameters for V3 Index, respectively. + pub fn with_ivf_pq_params_v3( + metric_type: MetricType, + ivf: IvfBuildParams, + pq: PQBuildParams, + ) -> Self { + let mut params = Self::with_ivf_pq_params(metric_type, ivf, pq); + params.force_use_new_index_format = Some(true); + params + } + /// Create index parameters with `IVF`, `PQ` and `HNSW` parameters, respectively. /// This is used for `IVF_HNSW_PQ` index. pub fn with_ivf_hnsw_pq_params( @@ -138,6 +156,7 @@ impl VectorIndexParams { Self { stages, metric_type, + force_use_new_index_format: None, } } @@ -157,6 +176,7 @@ impl VectorIndexParams { Self { stages, metric_type, + force_use_new_index_format: None, } } } @@ -252,16 +272,33 @@ pub(crate) async fn build_vector_index( }); }; - build_ivf_pq_index( - dataset, - column, - name, - uuid, - params.metric_type, - ivf_params, - pq_params, - ) - .await?; + let use_v3_index_format = params.force_use_new_index_format.unwrap_or(false); + + if use_v3_index_format { + IvfIndexBuilder::::new( + dataset.clone(), + column.to_owned(), + dataset.indices_dir().child(uuid), + params.metric_type, + Box::new(shuffler), + Some(ivf_params.clone()), + Some(pq_params.clone()), + (), + )? + .build() + .await?; + } else { + build_ivf_pq_index( + dataset, + column, + name, + uuid, + params.metric_type, + ivf_params, + pq_params, + ) + .await?; + } } else if is_ivf_hnsw(stages) { let len = stages.len(); let StageParams::Hnsw(hnsw_params) = &stages[1] else { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index ae68b0582d..e56142074b 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2210,8 +2210,7 @@ mod tests { .await; } - #[tokio::test] - async fn test_create_ivf_pq_cosine() { + async fn run_ivf_pq_cosine_test(is_v3_index: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2223,8 +2222,11 @@ mod tests { let pq_params = PQBuildParams::new(4, 8); - let params = - VectorIndexParams::with_ivf_pq_params(MetricType::Cosine, ivf_params, pq_params); + let params: VectorIndexParams = if is_v3_index { + VectorIndexParams::with_ivf_pq_params_v3(MetricType::Cosine, ivf_params, pq_params) + } else { + VectorIndexParams::with_ivf_pq_params(MetricType::Cosine, ivf_params, pq_params) + }; dataset .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) @@ -2260,68 +2262,7 @@ mod tests { } } - #[tokio::test] - async fn test_build_ivf_model_l2() { - let test_dir = tempdir().unwrap(); - let test_uri = test_dir.path().to_str().unwrap(); - - let (dataset, _) = generate_test_dataset(test_uri, 1000.0..1100.0).await; - - let ivf_params = IvfBuildParams::new(2); - let ivf_model = build_ivf_model(&dataset, "vector", DIM, MetricType::L2, &ivf_params) - .await - .unwrap(); - assert_eq!(2, ivf_model.centroids.as_ref().unwrap().len()); - assert_eq!(32, ivf_model.centroids.as_ref().unwrap().value_length()); - assert_eq!(2, ivf_model.num_partitions()); - - // All centroids values should be in the range [1000, 1100] - ivf_model - .centroids - .unwrap() - .values() - .as_primitive::() - .values() - .iter() - .for_each(|v| { - assert!((1000.0..1100.0).contains(v)); - }); - } - - #[tokio::test] - async fn test_build_ivf_model_cosine() { - let test_dir = tempdir().unwrap(); - let test_uri = test_dir.path().to_str().unwrap(); - - let (dataset, _) = generate_test_dataset(test_uri, 1000.0..1100.0).await; - - let ivf_params = IvfBuildParams::new(2); - let ivf_model = build_ivf_model(&dataset, "vector", DIM, MetricType::Cosine, &ivf_params) - .await - .unwrap(); - assert_eq!(2, ivf_model.centroids.as_ref().unwrap().len()); - assert_eq!(32, ivf_model.centroids.as_ref().unwrap().value_length()); - assert_eq!(2, ivf_model.num_partitions()); - - // All centroids values should be in the range [1000, 1100] - ivf_model - .centroids - .unwrap() - .values() - .as_primitive::() - .values() - .iter() - .for_each(|v| { - assert!( - (-1.0..1.0).contains(v), - "Expect cosine value in range [-1.0, 1.0], got: {}", - v - ); - }); - } - - #[tokio::test] - async fn test_create_ivf_pq_dot() { + async fn run_ivf_pq_dot_test(is_v3_index: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2334,7 +2275,11 @@ mod tests { let codebook = Arc::new(generate_random_array(256 * DIM)); let pq_params = PQBuildParams::with_codebook(4, 8, codebook); - let params = VectorIndexParams::with_ivf_pq_params(MetricType::Dot, ivf_params, pq_params); + let params: VectorIndexParams = if is_v3_index { + VectorIndexParams::with_ivf_pq_params_v3(MetricType::Dot, ivf_params, pq_params) + } else { + VectorIndexParams::with_ivf_pq_params(MetricType::Dot, ivf_params, pq_params) + }; dataset .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) @@ -2371,8 +2316,7 @@ mod tests { } } - #[tokio::test] - async fn test_create_ivf_pq_f16() { + async fn run_create_ivf_pq_f16_test(is_v3_index: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2392,11 +2336,20 @@ mod tests { let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); let mut dataset = Dataset::write(batches, test_uri, None).await.unwrap(); - let params = VectorIndexParams::with_ivf_pq_params( - MetricType::L2, - IvfBuildParams::new(2), - PQBuildParams::new(4, 8), - ); + let params: VectorIndexParams = if is_v3_index { + VectorIndexParams::with_ivf_pq_params_v3( + MetricType::L2, + IvfBuildParams::new(2), + PQBuildParams::new(4, 8), + ) + } else { + VectorIndexParams::with_ivf_pq_params( + MetricType::L2, + IvfBuildParams::new(2), + PQBuildParams::new(4, 8), + ) + }; + dataset .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) .await @@ -2435,8 +2388,7 @@ mod tests { ); } - #[tokio::test] - async fn test_create_ivf_pq_f16_with_codebook() { + async fn run_create_ivf_pq_f16_with_codebook_test(is_v3_index: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2460,11 +2412,20 @@ mod tests { 256 * DIM, [22; 32], )); - let params = VectorIndexParams::with_ivf_pq_params( - MetricType::L2, - IvfBuildParams::new(2), - PQBuildParams::with_codebook(4, 8, codebook), - ); + let params: VectorIndexParams = if is_v3_index { + VectorIndexParams::with_ivf_pq_params_v3( + MetricType::L2, + IvfBuildParams::new(2), + PQBuildParams::with_codebook(4, 8, codebook), + ) + } else { + VectorIndexParams::with_ivf_pq_params( + MetricType::L2, + IvfBuildParams::new(2), + PQBuildParams::with_codebook(4, 8, codebook), + ) + }; + dataset .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) .await @@ -2503,8 +2464,7 @@ mod tests { ); } - #[tokio::test] - async fn test_create_ivf_pq_with_invalid_num_sub_vectors() { + async fn run_create_ivf_pq_with_invalid_num_sub_vectors_test(is_v3_index: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2524,11 +2484,20 @@ mod tests { let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); let mut dataset = Dataset::write(batches, test_uri, None).await.unwrap(); - let params = VectorIndexParams::with_ivf_pq_params( - MetricType::L2, - IvfBuildParams::new(256), - PQBuildParams::new(6, 8), - ); + let params: VectorIndexParams = if is_v3_index { + VectorIndexParams::with_ivf_pq_params_v3( + MetricType::L2, + IvfBuildParams::new(256), + PQBuildParams::new(6, 8), + ) + } else { + VectorIndexParams::with_ivf_pq_params( + MetricType::L2, + IvfBuildParams::new(256), + PQBuildParams::new(6, 8), + ) + }; + let res = dataset .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) .await; @@ -2546,6 +2515,116 @@ mod tests { } } + #[tokio::test] + async fn test_create_ivf_pq_cosine() { + run_ivf_pq_cosine_test(false).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_v3_cosine() { + run_ivf_pq_cosine_test(true).await; + } + + #[tokio::test] + async fn test_build_ivf_model_l2() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let (dataset, _) = generate_test_dataset(test_uri, 1000.0..1100.0).await; + + let ivf_params = IvfBuildParams::new(2); + let ivf_model = build_ivf_model(&dataset, "vector", DIM, MetricType::L2, &ivf_params) + .await + .unwrap(); + assert_eq!(2, ivf_model.centroids.as_ref().unwrap().len()); + assert_eq!(32, ivf_model.centroids.as_ref().unwrap().value_length()); + assert_eq!(2, ivf_model.num_partitions()); + + // All centroids values should be in the range [1000, 1100] + ivf_model + .centroids + .unwrap() + .values() + .as_primitive::() + .values() + .iter() + .for_each(|v| { + assert!((1000.0..1100.0).contains(v)); + }); + } + + #[tokio::test] + async fn test_build_ivf_model_cosine() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let (dataset, _) = generate_test_dataset(test_uri, 1000.0..1100.0).await; + + let ivf_params = IvfBuildParams::new(2); + let ivf_model = build_ivf_model(&dataset, "vector", DIM, MetricType::Cosine, &ivf_params) + .await + .unwrap(); + assert_eq!(2, ivf_model.centroids.as_ref().unwrap().len()); + assert_eq!(32, ivf_model.centroids.as_ref().unwrap().value_length()); + assert_eq!(2, ivf_model.num_partitions()); + + // All centroids values should be in the range [1000, 1100] + ivf_model + .centroids + .unwrap() + .values() + .as_primitive::() + .values() + .iter() + .for_each(|v| { + assert!( + (-1.0..1.0).contains(v), + "Expect cosine value in range [-1.0, 1.0], got: {}", + v + ); + }); + } + + #[tokio::test] + async fn test_create_ivf_pq_dot() { + run_ivf_pq_dot_test(false).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_v3_dot() { + run_ivf_pq_dot_test(true).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_f16() { + run_create_ivf_pq_f16_test(false).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_v3_f16() { + run_create_ivf_pq_f16_test(true).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_f16_with_codebook() { + run_create_ivf_pq_f16_with_codebook_test(false).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_v3_f16_with_codebook() { + run_create_ivf_pq_f16_with_codebook_test(true).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_with_invalid_num_sub_vectors() { + run_create_ivf_pq_with_invalid_num_sub_vectors_test(false).await; + } + + #[tokio::test] + async fn test_create_ivf_pq_v3_with_invalid_num_sub_vectors() { + run_create_ivf_pq_with_invalid_num_sub_vectors_test(true).await; + } + fn ground_truth( fsl: &FixedSizeListArray, query: &[f32], diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 2f359aa610..33bb75e50c 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -668,6 +668,22 @@ mod tests { test_index(params, nlist, recall_requirement).await; } + #[rstest] + #[case(4, DistanceType::L2, 0.9)] + #[case(4, DistanceType::Cosine, 0.6)] + #[case(4, DistanceType::Dot, 0.2)] + #[tokio::test] + async fn test_build_ivf_pq_v3( + #[case] nlist: usize, + #[case] distance_type: DistanceType, + #[case] recall_requirement: f32, + ) { + let ivf_params = IvfBuildParams::new(nlist); + let pq_params = PQBuildParams::default(); + let params = VectorIndexParams::with_ivf_pq_params_v3(distance_type, ivf_params, pq_params); + test_index(params, nlist, recall_requirement).await; + } + #[rstest] #[case(4, DistanceType::L2, 0.9)] #[case(4, DistanceType::Cosine, 0.9)]