Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configuration to create v3 ivf_pq indices via python #2941

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ pub fn get_index_params(
Some(VectorIndexParams {
metric_type: distance_type,
stages,
force_use_new_index_format: None,
})
} else {
None
Expand Down
7 changes: 7 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ def create_index(
storage_options: Optional[Dict[str, str]] = None,
filter_nan: bool = True,
one_pass_ivfpq: bool = False,
use_new_vector_index_format: Optional[bool] = None,
**kwargs,
) -> LanceDataset:
"""Create index on column.
Expand Down Expand Up @@ -1511,6 +1512,9 @@ def create_index(
for nullable columns. Obtains a small speed boost.
one_pass_ivfpq: bool
Defaults to False. If enabled, index type must be "IVF_PQ". Reduces disk IO.
use_new_vector_index_format : bool, optional
Allows to use the new V3 index format. Default is False. This is mostly for
IVF_PQ indices which still defaults to use the legacy format.
kwargs :
Parameters passed to the index building process.

Expand Down Expand Up @@ -1900,6 +1904,9 @@ def create_index(
)
kwargs["pq_codebook"] = pq_codebook_batch

if index_type == "IVF_PQ" and use_new_vector_index_format is not None:
kwargs["use_new_vector_index_format"] = use_new_vector_index_format

if shuffle_partition_batches is not None:
kwargs["shuffle_partition_batches"] = shuffle_partition_batches
if shuffle_partition_concurrency is not None:
Expand Down
57 changes: 57 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from lance.commit import CommitConflictError
from lance.debug import format_fragment
from lance.util import validate_vector_index
from lance.file import LanceFileReader

# Various valid inputs for write_dataset
input_schema = pa.schema([pa.field("a", pa.float64()), pa.field("b", pa.int64())])
Expand Down Expand Up @@ -2358,6 +2359,12 @@ def test_count_index_rows(tmp_path: Path):
assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512
assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512

indices = dataset.list_indices()
index_uuid = indices[0]["uuid"]
index_dir = base_dir / f"_indices/{index_uuid}"
index_path = os.listdir(index_dir)
assert len(index_path) == 1


def test_dataset_progress(tmp_path: Path):
data = pa.table({"a": range(10)})
Expand Down Expand Up @@ -2650,3 +2657,53 @@ def test_default_storage_version(tmp_path: Path):
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION


def test_ivf_index_creation_with_v3_version(tmp_path: Path):
dims = 32
schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)])
values = pc.random(512 * dims).cast("float32")
table = pa.Table.from_pydict(
{"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema
)

base_dir = tmp_path / "test"

dataset = lance.write_dataset(table, base_dir)

# assert we return None for index name that doesn't exist
index_name = "a_idx"
with pytest.raises(KeyError):
dataset.stats.index_stats(index_name)["num_unindexed_rows"]
with pytest.raises(KeyError):
dataset.stats.index_stats(index_name)["num_indexed_rows"]

# create index and assert no rows are uncounted
dataset.create_index(
"a",
"IVF_PQ",
name=index_name,
num_partitions=2,
num_sub_vectors=1,
use_new_vector_index_format=True,
)
assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0
assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512

# append some data
new_table = pa.Table.from_pydict(
{"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema
)
dataset = lance.write_dataset(new_table, base_dir, mode="append")

# assert rows added since index was created are uncounted
assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512
assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512

indices = dataset.list_indices()
index_uuid = indices[0]["uuid"]
storage_path = base_dir / f"_indices/{index_uuid}/auxiliary.idx"
reader = LanceFileReader(str(storage_path))
metadata = reader.metadata()
assert metadata.major_version == 0
assert metadata.minor_version == 3
19 changes: 16 additions & 3 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,7 @@ fn prepare_vector_index_params(
let mut hnsw_params = HnswBuildParams::default();
let mut pq_params = PQBuildParams::default();
let mut sq_params = SQBuildParams::default();
let mut use_new_vector_index_format = false;

if let Some(kwargs) = kwargs {
// Parse metric type
Expand Down Expand Up @@ -1623,6 +1624,10 @@ fn prepare_vector_index_params(
ivf_params.storage_options = Some(storage_options);
}

if let Some(use_new_format) = kwargs.get_item("use_new_vector_index_format")? {
use_new_vector_index_format = use_new_format.extract::<bool>()?;
};

match (
kwargs.get_item("precomputed_shuffle_buffers")?,
kwargs.get_item("precomputed_shuffle_buffers_path")?
Expand Down Expand Up @@ -1683,9 +1688,17 @@ fn prepare_vector_index_params(
}

match index_type {
"IVF_PQ" => Ok(Box::new(VectorIndexParams::with_ivf_pq_params(
m_type, ivf_params, pq_params,
))),
"IVF_PQ" => {
if use_new_vector_index_format {
Ok(Box::new(VectorIndexParams::with_ivf_pq_params_v3(
m_type, ivf_params, pq_params,
)))
} else {
Ok(Box::new(VectorIndexParams::with_ivf_pq_params(
m_type, ivf_params, pq_params,
)))
}
}

"IVF_HNSW_PQ" => Ok(Box::new(VectorIndexParams::with_ivf_hnsw_pq_params(
m_type,
Expand Down
57 changes: 47 additions & 10 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct VectorIndexParams {

/// Vector distance metrics type.
pub metric_type: MetricType,

/// Use V3 Index builder.
/// Only used by IVF_PQ index since IVF_PQ still creates old index format by default.
pub force_use_new_index_format: Option<bool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use an Enum to specify which version to use? We will likely to have new index format down the road.

Copy link
Contributor Author

@ankitvij-db ankitvij-db Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought of doing that and just pass the LanceVersion, however, I was not sure how does it tie to the index builder. Can change it to the lance version if that makes it clearer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I would prefer a index_file_version parameter over a use_new_index_format flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are V1 and V3 if you are going to create that enum
V2 has been removed, no index is in V2 format now.

}

impl VectorIndexParams {
Expand All @@ -74,6 +78,7 @@ impl VectorIndexParams {
Self {
stages,
metric_type,
force_use_new_index_format: None,
}
}

Expand Down Expand Up @@ -106,6 +111,7 @@ impl VectorIndexParams {
Self {
stages,
metric_type,
force_use_new_index_format: None,
}
}

Expand All @@ -119,9 +125,21 @@ impl VectorIndexParams {
Self {
stages,
metric_type,
force_use_new_index_format: None,
}
}

/// Create index parameters with `IVF` and `PQ` parameters for V3 Index, respectively.
pub fn with_ivf_pq_params_v3(
metric_type: MetricType,
ivf: IvfBuildParams,
pq: PQBuildParams,
) -> Self {
let mut params = Self::with_ivf_pq_params(metric_type, ivf, pq);
params.force_use_new_index_format = Some(true);
params
}

/// Create index parameters with `IVF`, `PQ` and `HNSW` parameters, respectively.
/// This is used for `IVF_HNSW_PQ` index.
pub fn with_ivf_hnsw_pq_params(
Expand All @@ -138,6 +156,7 @@ impl VectorIndexParams {
Self {
stages,
metric_type,
force_use_new_index_format: None,
}
}

Expand All @@ -157,6 +176,7 @@ impl VectorIndexParams {
Self {
stages,
metric_type,
force_use_new_index_format: None,
}
}
}
Expand Down Expand Up @@ -252,16 +272,33 @@ pub(crate) async fn build_vector_index(
});
};

build_ivf_pq_index(
dataset,
column,
name,
uuid,
params.metric_type,
ivf_params,
pq_params,
)
.await?;
let use_v3_index_format = params.force_use_new_index_format.unwrap_or(false);

if use_v3_index_format {
IvfIndexBuilder::<FlatIndex, ProductQuantizer>::new(
dataset.clone(),
column.to_owned(),
dataset.indices_dir().child(uuid),
params.metric_type,
Box::new(shuffler),
Some(ivf_params.clone()),
Some(pq_params.clone()),
(),
)?
.build()
.await?;
} else {
build_ivf_pq_index(
dataset,
column,
name,
uuid,
params.metric_type,
ivf_params,
pq_params,
)
.await?;
}
} else if is_ivf_hnsw(stages) {
let len = stages.len();
let StageParams::Hnsw(hnsw_params) = &stages[1] else {
Expand Down
Loading
Loading