Skip to content

Commit

Permalink
fix: avoid multiple threads loading same index partition (#2559)
Browse files Browse the repository at this point in the history
Adds locks for loading of index partitions. This voids multiple threads
querying concurrently from both allocating memory to load the partition,
which can cause memory issues during application startup.
  • Loading branch information
albertlockett authored Jul 4, 2024
1 parent e57d283 commit 685fe43
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 58 deletions.
2 changes: 1 addition & 1 deletion python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ def query_index(ds, ntimes, q=None):

indexed_dataset = lance.dataset(tmp_path / "test", index_cache_size=1)
# query using the same vector, we should get a very high hit rate
query_index(indexed_dataset, 100, q=rng.standard_normal(16))
query_index(indexed_dataset, 200, q=rng.standard_normal(16))
assert indexed_dataset._ds.index_cache_hit_rate() > 0.99

last_hit_rate = indexed_dataset._ds.index_cache_hit_rate()
Expand Down
65 changes: 39 additions & 26 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use snafu::{location, Location};
use tracing::instrument;
use uuid::Uuid;

use super::builder::IvfIndexBuilder;
use super::{builder::IvfIndexBuilder, utils::PartitionLoadLock};
use super::{
pq::{build_pq_model, PQIndex},
utils::maybe_sample_training_data,
Expand Down Expand Up @@ -106,6 +106,8 @@ pub struct IVFIndex {
/// Index in each partition.
sub_index: Arc<dyn VectorIndex>,

partition_locks: PartitionLoadLock,

metric_type: MetricType,

// The session cache holds an Arc to this object so we need to
Expand Down Expand Up @@ -139,13 +141,16 @@ impl IVFIndex {
location: location!(),
});
}

let num_partitions = ivf.num_partitions();
Ok(Self {
uuid: uuid.to_owned(),
session: Arc::downgrade(&session),
ivf,
reader,
sub_index,
metric_type,
partition_locks: PartitionLoadLock::new(num_partitions),
})
}

Expand All @@ -170,32 +175,40 @@ impl IVFIndex {
let part_index = if let Some(part_idx) = session.index_cache.get_vector(&cache_key) {
part_idx
} else {
if partition_id >= self.ivf.num_partitions() {
return Err(Error::Index {
message: format!(
"partition id {} is out of range of {} partitions",
partition_id,
self.ivf.num_partitions()
),
location: location!(),
});
}
let mtx = self.partition_locks.get_partition_mutex(partition_id);
let _guard = mtx.lock().await;
// check the cache again, as the partition may have been loaded by another
// thread that held the lock on loading the partition
if let Some(part_idx) = session.index_cache.get_vector(&cache_key) {
part_idx
} else {
if partition_id >= self.ivf.num_partitions() {
return Err(Error::Index {
message: format!(
"partition id {} is out of range of {} partitions",
partition_id,
self.ivf.num_partitions()
),
location: location!(),
});
}

let range = self.ivf.row_range(partition_id);
let idx = self
.sub_index
.load_partition(
self.reader.clone(),
range.start,
range.end - range.start,
partition_id,
)
.await?;
let idx: Arc<dyn VectorIndex> = idx.into();
if write_cache {
session.index_cache.insert_vector(&cache_key, idx.clone());
let range = self.ivf.row_range(partition_id);
let idx = self
.sub_index
.load_partition(
self.reader.clone(),
range.start,
range.end - range.start,
partition_id,
)
.await?;
let idx: Arc<dyn VectorIndex> = idx.into();
if write_cache {
session.index_cache.insert_vector(&cache_key, idx.clone());
}
idx
}
idx
};
Ok(part_index)
}
Expand Down Expand Up @@ -232,7 +245,7 @@ pub(crate) async fn optimize_vector_indices(
existing_indices: &[Arc<dyn Index>],
options: &OptimizeOptions,
) -> Result<(Uuid, usize)> {
// Senity check the indices
// Sanity check the indices
if existing_indices.is_empty() {
return Err(Error::Index {
message: "optimizing vector index: no existing index found".to_string(),
Expand Down
79 changes: 48 additions & 31 deletions rust/lance/src/index/vector/ivf/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ use snafu::{location, Location};
use tracing::instrument;

use crate::{
index::{vector::VectorIndex, PreFilter},
index::{
vector::{utils::PartitionLoadLock, VectorIndex},
PreFilter,
},
session::Session,
};

Expand All @@ -78,6 +81,8 @@ pub struct IVFIndex<S: IvfSubIndex + 'static, Q: Quantization + 'static> {
/// Index in each partition.
partition_cache: Cache<String, Arc<PartitionEntry<S, Q>>>,

partition_locks: PartitionLoadLock,

distance_type: DistanceType,

// The session cache holds an Arc to this object so we need to
Expand Down Expand Up @@ -167,12 +172,14 @@ impl<S: IvfSubIndex + 'static, Q: Quantization> IVFIndex<S, Q> {
.await?;
let storage = IvfQuantizationStorage::try_new(storage_reader).await?;

let num_partitions = ivf.num_partitions();
Ok(Self {
uuid,
ivf,
reader: index_reader,
storage,
partition_cache: Cache::new(DEFAULT_INDEX_CACHE_SIZE as u64),
partition_locks: PartitionLoadLock::new(num_partitions),
sub_index_metadata,
distance_type,
session,
Expand Down Expand Up @@ -201,38 +208,48 @@ impl<S: IvfSubIndex + 'static, Q: Quantization> IVFIndex<S, Q> {
});
}

let schema = Arc::new(self.reader.schema().as_ref().into());
let batch = match self.reader.metadata().num_rows {
0 => RecordBatch::new_empty(schema),
_ => {
let batches = self
.reader
.read_stream(
ReadBatchParams::Range(self.ivf.row_range(partition_id)),
u32::MAX,
1,
FilterExpression::no_filter(),
)?
.try_collect::<Vec<_>>()
.await?;
concat_batches(&schema, batches.iter())?
let mtx = self.partition_locks.get_partition_mutex(partition_id);
let _guard = mtx.lock().await;

// check the cache again, as the partition may have been loaded by another
// thread that held the lock on loading the partition
if let Some(part_idx) = self.partition_cache.get(&cache_key) {
part_idx
} else {
let schema = Arc::new(self.reader.schema().as_ref().into());
let batch = match self.reader.metadata().num_rows {
0 => RecordBatch::new_empty(schema),
_ => {
let batches = self
.reader
.read_stream(
ReadBatchParams::Range(self.ivf.row_range(partition_id)),
u32::MAX,
1,
FilterExpression::no_filter(),
)?
.try_collect::<Vec<_>>()
.await?;
concat_batches(&schema, batches.iter())?
}
};
let batch = batch.add_metadata(
S::metadata_key().to_owned(),
self.sub_index_metadata[partition_id].clone(),
)?;
let idx = S::load(batch)?;
let storage = self.load_partition_storage(partition_id).await?;
let partition_entry = Arc::new(PartitionEntry {
index: idx,
storage,
});
if write_cache {
self.partition_cache
.insert(cache_key.clone(), partition_entry.clone());
}
};
let batch = batch.add_metadata(
S::metadata_key().to_owned(),
self.sub_index_metadata[partition_id].clone(),
)?;
let idx = S::load(batch)?;
let storage = self.load_partition_storage(partition_id).await?;
let partition_entry = Arc::new(PartitionEntry {
index: idx,
storage,
});
if write_cache {
self.partition_cache
.insert(cache_key.clone(), partition_entry.clone());

partition_entry
}
partition_entry
};

Ok(part_entry)
Expand Down
22 changes: 22 additions & 0 deletions rust/lance/src/index/vector/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_schema::Schema as ArrowSchema;
use arrow_select::concat::concat_batches;
use futures::stream::TryStreamExt;
use snafu::{location, Location};
use tokio::sync::Mutex;

use crate::dataset::Dataset;
use crate::{Error, Result};
Expand Down Expand Up @@ -65,3 +66,24 @@ pub async fn maybe_sample_training_data(
})?;
Ok(array.as_fixed_size_list().clone())
}

#[derive(Debug)]
pub struct PartitionLoadLock {
partition_locks: Vec<Arc<Mutex<()>>>,
}

impl PartitionLoadLock {
pub fn new(num_partitions: usize) -> Self {
Self {
partition_locks: (0..num_partitions)
.map(|_| Arc::new(Mutex::new(())))
.collect(),
}
}

pub fn get_partition_mutex(&self, partition_id: usize) -> Arc<Mutex<()>> {
let mtx = &self.partition_locks[partition_id];

mtx.clone()
}
}

0 comments on commit 685fe43

Please sign in to comment.