From 685fe43b03331b00d1b4be671e1b18aac4d699b3 Mon Sep 17 00:00:00 2001 From: Bert Date: Thu, 4 Jul 2024 16:02:55 -0300 Subject: [PATCH] fix: avoid multiple threads loading same index partition (#2559) Adds locks for loading of index partitions. This voids multiple threads querying concurrently from both allocating memory to load the partition, which can cause memory issues during application startup. --- python/python/tests/test_vector_index.py | 2 +- rust/lance/src/index/vector/ivf.rs | 65 +++++++++++-------- rust/lance/src/index/vector/ivf/v2.rs | 79 ++++++++++++++---------- rust/lance/src/index/vector/utils.rs | 22 +++++++ 4 files changed, 110 insertions(+), 58 deletions(-) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 26e0a4ab2d..8715bbc2fc 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -605,7 +605,7 @@ def query_index(ds, ntimes, q=None): indexed_dataset = lance.dataset(tmp_path / "test", index_cache_size=1) # query using the same vector, we should get a very high hit rate - query_index(indexed_dataset, 100, q=rng.standard_normal(16)) + query_index(indexed_dataset, 200, q=rng.standard_normal(16)) assert indexed_dataset._ds.index_cache_hit_rate() > 0.99 last_hit_rate = indexed_dataset._ds.index_cache_hit_rate() diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 96f1f3943d..d2a720819f 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -73,7 +73,7 @@ use snafu::{location, Location}; use tracing::instrument; use uuid::Uuid; -use super::builder::IvfIndexBuilder; +use super::{builder::IvfIndexBuilder, utils::PartitionLoadLock}; use super::{ pq::{build_pq_model, PQIndex}, utils::maybe_sample_training_data, @@ -106,6 +106,8 @@ pub struct IVFIndex { /// Index in each partition. sub_index: Arc, + partition_locks: PartitionLoadLock, + metric_type: MetricType, // The session cache holds an Arc to this object so we need to @@ -139,6 +141,8 @@ impl IVFIndex { location: location!(), }); } + + let num_partitions = ivf.num_partitions(); Ok(Self { uuid: uuid.to_owned(), session: Arc::downgrade(&session), @@ -146,6 +150,7 @@ impl IVFIndex { reader, sub_index, metric_type, + partition_locks: PartitionLoadLock::new(num_partitions), }) } @@ -170,32 +175,40 @@ impl IVFIndex { let part_index = if let Some(part_idx) = session.index_cache.get_vector(&cache_key) { part_idx } else { - if partition_id >= self.ivf.num_partitions() { - return Err(Error::Index { - message: format!( - "partition id {} is out of range of {} partitions", - partition_id, - self.ivf.num_partitions() - ), - location: location!(), - }); - } + let mtx = self.partition_locks.get_partition_mutex(partition_id); + let _guard = mtx.lock().await; + // check the cache again, as the partition may have been loaded by another + // thread that held the lock on loading the partition + if let Some(part_idx) = session.index_cache.get_vector(&cache_key) { + part_idx + } else { + if partition_id >= self.ivf.num_partitions() { + return Err(Error::Index { + message: format!( + "partition id {} is out of range of {} partitions", + partition_id, + self.ivf.num_partitions() + ), + location: location!(), + }); + } - let range = self.ivf.row_range(partition_id); - let idx = self - .sub_index - .load_partition( - self.reader.clone(), - range.start, - range.end - range.start, - partition_id, - ) - .await?; - let idx: Arc = idx.into(); - if write_cache { - session.index_cache.insert_vector(&cache_key, idx.clone()); + let range = self.ivf.row_range(partition_id); + let idx = self + .sub_index + .load_partition( + self.reader.clone(), + range.start, + range.end - range.start, + partition_id, + ) + .await?; + let idx: Arc = idx.into(); + if write_cache { + session.index_cache.insert_vector(&cache_key, idx.clone()); + } + idx } - idx }; Ok(part_index) } @@ -232,7 +245,7 @@ pub(crate) async fn optimize_vector_indices( existing_indices: &[Arc], options: &OptimizeOptions, ) -> Result<(Uuid, usize)> { - // Senity check the indices + // Sanity check the indices if existing_indices.is_empty() { return Err(Error::Index { message: "optimizing vector index: no existing index found".to_string(), diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0c69eac153..98282d6198 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -51,7 +51,10 @@ use snafu::{location, Location}; use tracing::instrument; use crate::{ - index::{vector::VectorIndex, PreFilter}, + index::{ + vector::{utils::PartitionLoadLock, VectorIndex}, + PreFilter, + }, session::Session, }; @@ -78,6 +81,8 @@ pub struct IVFIndex { /// Index in each partition. partition_cache: Cache>>, + partition_locks: PartitionLoadLock, + distance_type: DistanceType, // The session cache holds an Arc to this object so we need to @@ -167,12 +172,14 @@ impl IVFIndex { .await?; let storage = IvfQuantizationStorage::try_new(storage_reader).await?; + let num_partitions = ivf.num_partitions(); Ok(Self { uuid, ivf, reader: index_reader, storage, partition_cache: Cache::new(DEFAULT_INDEX_CACHE_SIZE as u64), + partition_locks: PartitionLoadLock::new(num_partitions), sub_index_metadata, distance_type, session, @@ -201,38 +208,48 @@ impl IVFIndex { }); } - let schema = Arc::new(self.reader.schema().as_ref().into()); - let batch = match self.reader.metadata().num_rows { - 0 => RecordBatch::new_empty(schema), - _ => { - let batches = self - .reader - .read_stream( - ReadBatchParams::Range(self.ivf.row_range(partition_id)), - u32::MAX, - 1, - FilterExpression::no_filter(), - )? - .try_collect::>() - .await?; - concat_batches(&schema, batches.iter())? + let mtx = self.partition_locks.get_partition_mutex(partition_id); + let _guard = mtx.lock().await; + + // check the cache again, as the partition may have been loaded by another + // thread that held the lock on loading the partition + if let Some(part_idx) = self.partition_cache.get(&cache_key) { + part_idx + } else { + let schema = Arc::new(self.reader.schema().as_ref().into()); + let batch = match self.reader.metadata().num_rows { + 0 => RecordBatch::new_empty(schema), + _ => { + let batches = self + .reader + .read_stream( + ReadBatchParams::Range(self.ivf.row_range(partition_id)), + u32::MAX, + 1, + FilterExpression::no_filter(), + )? + .try_collect::>() + .await?; + concat_batches(&schema, batches.iter())? + } + }; + let batch = batch.add_metadata( + S::metadata_key().to_owned(), + self.sub_index_metadata[partition_id].clone(), + )?; + let idx = S::load(batch)?; + let storage = self.load_partition_storage(partition_id).await?; + let partition_entry = Arc::new(PartitionEntry { + index: idx, + storage, + }); + if write_cache { + self.partition_cache + .insert(cache_key.clone(), partition_entry.clone()); } - }; - let batch = batch.add_metadata( - S::metadata_key().to_owned(), - self.sub_index_metadata[partition_id].clone(), - )?; - let idx = S::load(batch)?; - let storage = self.load_partition_storage(partition_id).await?; - let partition_entry = Arc::new(PartitionEntry { - index: idx, - storage, - }); - if write_cache { - self.partition_cache - .insert(cache_key.clone(), partition_entry.clone()); + + partition_entry } - partition_entry }; Ok(part_entry) diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index 2dc1c92cd8..661877ed53 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -8,6 +8,7 @@ use arrow_schema::Schema as ArrowSchema; use arrow_select::concat::concat_batches; use futures::stream::TryStreamExt; use snafu::{location, Location}; +use tokio::sync::Mutex; use crate::dataset::Dataset; use crate::{Error, Result}; @@ -65,3 +66,24 @@ pub async fn maybe_sample_training_data( })?; Ok(array.as_fixed_size_list().clone()) } + +#[derive(Debug)] +pub struct PartitionLoadLock { + partition_locks: Vec>>, +} + +impl PartitionLoadLock { + pub fn new(num_partitions: usize) -> Self { + Self { + partition_locks: (0..num_partitions) + .map(|_| Arc::new(Mutex::new(()))) + .collect(), + } + } + + pub fn get_partition_mutex(&self, partition_id: usize) -> Arc> { + let mtx = &self.partition_locks[partition_id]; + + mtx.clone() + } +}