Skip to content

Commit

Permalink
fix(cache): address in-memory cache invalidation using global tenant …
Browse files Browse the repository at this point in the history
…as `key_prefix` (#6976)
  • Loading branch information
Chethan-rao authored Jan 3, 2025
1 parent 7d00583 commit fce5ffa
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 83 deletions.
15 changes: 5 additions & 10 deletions crates/redis_interface/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,13 @@ impl super::RedisConnectionPool {
#[instrument(level = "DEBUG", skip(self))]
pub async fn delete_multiple_keys(
&self,
keys: Vec<String>,
keys: &[String],
) -> CustomResult<Vec<DelReply>, errors::RedisError> {
let mut del_result = Vec::with_capacity(keys.len());
let futures = keys.iter().map(|key| self.pool.del(self.add_prefix(key)));

for key in keys {
del_result.push(
self.pool
.del(self.add_prefix(&key))
.await
.change_context(errors::RedisError::DeleteFailed)?,
);
}
let del_result = futures::future::try_join_all(futures)
.await
.change_context(errors::RedisError::DeleteFailed)?;

Ok(del_result)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4288,7 +4288,7 @@ impl ProfileWrapper {
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update routing algorithm ref in business profile")?;

storage_impl::redis::cache::publish_into_redact_channel(
storage_impl::redis::cache::redact_from_redis_and_publish(
db.get_cache_store().as_ref(),
[routing_cache_key],
)
Expand Down
4 changes: 2 additions & 2 deletions crates/router/src/core/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use common_utils::errors::CustomResult;
use error_stack::{report, ResultExt};
use storage_impl::redis::cache::{publish_into_redact_channel, CacheKind};
use storage_impl::redis::cache::{redact_from_redis_and_publish, CacheKind};

use super::errors;
use crate::{routes::SessionState, services};
Expand All @@ -10,7 +10,7 @@ pub async fn invalidate(
key: &str,
) -> CustomResult<services::api::ApplicationResponse<serde_json::Value>, errors::ApiErrorResponse> {
let store = state.store.as_ref();
let result = publish_into_redact_channel(
let result = redact_from_redis_and_publish(
store.get_cache_store().as_ref(),
[CacheKind::All(key.into())],
)
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ pub async fn success_based_routing_update_configs(
let cache_entries_to_redact = vec![cache::CacheKind::SuccessBasedDynamicRoutingCache(
cache_key.into(),
)];
let _ = cache::publish_into_redact_channel(
let _ = cache::redact_from_redis_and_publish(
state.store.get_cache_store().as_ref(),
cache_entries_to_redact,
)
Expand Down
6 changes: 3 additions & 3 deletions crates/router/src/core/routing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub async fn update_merchant_active_algorithm_ref(
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update routing algorithm ref in merchant account")?;

cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [config_key])
cache::redact_from_redis_and_publish(db.get_cache_store().as_ref(), [config_key])
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to invalidate the config cache")?;
Expand Down Expand Up @@ -256,7 +256,7 @@ pub async fn update_profile_active_algorithm_ref(
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update routing algorithm ref in business profile")?;

cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [routing_cache_key])
cache::redact_from_redis_and_publish(db.get_cache_store().as_ref(), [routing_cache_key])
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to invalidate routing cache")?;
Expand Down Expand Up @@ -1031,7 +1031,7 @@ pub async fn disable_dynamic_routing_algorithm(
};

// redact cache for dynamic routing config
let _ = cache::publish_into_redact_channel(
let _ = cache::redact_from_redis_and_publish(
state.store.get_cache_store().as_ref(),
cache_entries_to_redact,
)
Expand Down
33 changes: 12 additions & 21 deletions crates/router/src/db/configs.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use diesel_models::configs::ConfigUpdateInternal;
use error_stack::{report, ResultExt};
use router_env::{instrument, tracing};
use storage_impl::redis::{
cache::{self, CacheKind, CONFIG_CACHE},
kv_store::RedisConnInterface,
pub_sub::PubSubInterface,
};
use storage_impl::redis::cache::{self, CacheKind, CONFIG_CACHE};

use super::{MockDb, Store};
use crate::{
connection,
core::errors::{self, CustomResult},
db::StorageInterface,
types::storage,
};

Expand Down Expand Up @@ -69,14 +66,11 @@ impl ConfigInterface for Store {
.await
.map_err(|error| report!(errors::StorageError::from(error)))?;

self.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(
cache::IMC_INVALIDATION_CHANNEL,
CacheKind::Config((&inserted.key).into()),
)
.await
.map_err(Into::<errors::StorageError>::into)?;
cache::redact_from_redis_and_publish(
self.get_cache_store().as_ref(),
[CacheKind::Config((&inserted.key).into())],
)
.await?;

Ok(inserted)
}
Expand Down Expand Up @@ -177,14 +171,11 @@ impl ConfigInterface for Store {
.await
.map_err(|error| report!(errors::StorageError::from(error)))?;

self.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(
cache::IMC_INVALIDATION_CHANNEL,
CacheKind::Config(key.into()),
)
.await
.map_err(Into::<errors::StorageError>::into)?;
cache::redact_from_redis_and_publish(
self.get_cache_store().as_ref(),
[CacheKind::Config((&deleted.key).into())],
)
.await?;

Ok(deleted)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/router/src/db/merchant_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ async fn publish_and_redact_merchant_account_cache(
cache_keys.extend(publishable_key.into_iter());
cache_keys.extend(cgraph_key.into_iter());

cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?;
cache::redact_from_redis_and_publish(store.get_cache_store().as_ref(), cache_keys).await?;
Ok(())
}

Expand All @@ -822,6 +822,6 @@ async fn publish_and_redact_all_merchant_account_cache(
.map(|s| CacheKind::Accounts(s.into()))
.collect();

cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?;
cache::redact_from_redis_and_publish(store.get_cache_store().as_ref(), cache_keys).await?;
Ok(())
}
85 changes: 47 additions & 38 deletions crates/storage_impl/src/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use std::{any::Any, borrow::Cow, fmt::Debug, sync::Arc};

use common_utils::{
errors::{self, CustomResult},
ext_traits::{AsyncExt, ByteSliceExt},
ext_traits::ByteSliceExt,
};
use dyn_clone::DynClone;
use error_stack::{Report, ResultExt};
use moka::future::Cache as MokaCache;
use once_cell::sync::Lazy;
use redis_interface::{errors::RedisError, RedisConnectionPool, RedisValue};
use router_env::tracing::{self, instrument};
use router_env::{
logger,
tracing::{self, instrument},
};

use crate::{
errors::StorageError,
Expand Down Expand Up @@ -100,7 +103,7 @@ pub struct CacheRedact<'a> {
pub kind: CacheKind<'a>,
}

#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum CacheKind<'a> {
Config(Cow<'a, str>),
Accounts(Cow<'a, str>),
Expand All @@ -114,6 +117,23 @@ pub enum CacheKind<'a> {
All(Cow<'a, str>),
}

impl CacheKind<'_> {
pub(crate) fn get_key_without_prefix(&self) -> &str {
match self {
CacheKind::Config(key)
| CacheKind::Accounts(key)
| CacheKind::Routing(key)
| CacheKind::DecisionManager(key)
| CacheKind::Surcharge(key)
| CacheKind::CGraph(key)
| CacheKind::SuccessBasedDynamicRoutingCache(key)
| CacheKind::EliminationBasedDynamicRoutingCache(key)
| CacheKind::PmFiltersCGraph(key)
| CacheKind::All(key) => key,
}
}
}

impl<'a> TryFrom<CacheRedact<'a>> for RedisValue {
type Error = Report<errors::ValidationError>;
fn try_from(v: CacheRedact<'a>) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -343,48 +363,37 @@ where
}

#[instrument(skip_all)]
pub async fn redact_cache<T, F, Fut>(
pub async fn redact_from_redis_and_publish<
'a,
K: IntoIterator<Item = CacheKind<'a>> + Send + Clone,
>(
store: &(dyn RedisConnInterface + Send + Sync),
key: &'static str,
fun: F,
in_memory: Option<&Cache>,
) -> CustomResult<T, StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let data = fun().await?;

keys: K,
) -> CustomResult<usize, StorageError> {
let redis_conn = store
.get_redis_conn()
.change_context(StorageError::RedisError(
RedisError::RedisConnectionError.into(),
))
.attach_printable("Failed to get redis connection")?;
let tenant_key = CacheKey {
key: key.to_string(),
prefix: redis_conn.key_prefix.clone(),
};
in_memory.async_map(|cache| cache.remove(tenant_key)).await;

redis_conn
.delete_key(key)
let redis_keys_to_be_deleted = keys
.clone()
.into_iter()
.map(|val| val.get_key_without_prefix().to_owned())
.collect::<Vec<_>>();

let del_replies = redis_conn
.delete_multiple_keys(&redis_keys_to_be_deleted)
.await
.change_context(StorageError::KVError)?;
Ok(data)
}
.map_err(StorageError::RedisError)?;

#[instrument(skip_all)]
pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a>> + Send>(
store: &(dyn RedisConnInterface + Send + Sync),
keys: K,
) -> CustomResult<usize, StorageError> {
let redis_conn = store
.get_redis_conn()
.change_context(StorageError::RedisError(
RedisError::RedisConnectionError.into(),
))
.attach_printable("Failed to get redis connection")?;
let deletion_result = redis_keys_to_be_deleted
.into_iter()
.zip(del_replies)
.collect::<Vec<_>>();

logger::debug!(redis_deletion_result=?deletion_result);

let futures = keys.into_iter().map(|key| async {
redis_conn
Expand All @@ -411,7 +420,7 @@ where
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, [key]).await?;
redact_from_redis_and_publish(store, [key]).await?;
Ok(data)
}

Expand All @@ -424,10 +433,10 @@ pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>(
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
K: IntoIterator<Item = CacheKind<'a>> + Send,
K: IntoIterator<Item = CacheKind<'a>> + Send + Clone,
{
let data = fun().await?;
publish_into_redact_channel(store, keys).await?;
redact_from_redis_and_publish(store, keys).await?;
Ok(data)
}

Expand Down
5 changes: 0 additions & 5 deletions crates/storage_impl/src/redis/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,6 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
}
};

self.delete_key(key.as_ref())
.await
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
.ok();

logger::debug!(
key_prefix=?message.tenant.clone(),
channel_name=?channel_name,
Expand Down

0 comments on commit fce5ffa

Please sign in to comment.