diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 19497d6fbb83..3c7ffa16ada3 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -211,18 +211,13 @@ impl super::RedisConnectionPool { #[instrument(level = "DEBUG", skip(self))] pub async fn delete_multiple_keys( &self, - keys: Vec, + keys: &[String], ) -> CustomResult, errors::RedisError> { - let mut del_result = Vec::with_capacity(keys.len()); + let futures = keys.iter().map(|key| self.pool.del(self.add_prefix(key))); - for key in keys { - del_result.push( - self.pool - .del(self.add_prefix(&key)) - .await - .change_context(errors::RedisError::DeleteFailed)?, - ); - } + let del_result = futures::future::try_join_all(futures) + .await + .change_context(errors::RedisError::DeleteFailed)?; Ok(del_result) } diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 24a2b25d6ce9..19c3e6c1e228 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -4288,7 +4288,7 @@ impl ProfileWrapper { .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update routing algorithm ref in business profile")?; - storage_impl::redis::cache::publish_into_redact_channel( + storage_impl::redis::cache::redact_from_redis_and_publish( db.get_cache_store().as_ref(), [routing_cache_key], ) diff --git a/crates/router/src/core/cache.rs b/crates/router/src/core/cache.rs index 8cda60cf7009..fbe75f3c9c0f 100644 --- a/crates/router/src/core/cache.rs +++ b/crates/router/src/core/cache.rs @@ -1,6 +1,6 @@ use common_utils::errors::CustomResult; use error_stack::{report, ResultExt}; -use storage_impl::redis::cache::{publish_into_redact_channel, CacheKind}; +use storage_impl::redis::cache::{redact_from_redis_and_publish, CacheKind}; use super::errors; use crate::{routes::SessionState, services}; @@ -10,7 +10,7 @@ pub async fn invalidate( key: &str, ) -> CustomResult, errors::ApiErrorResponse> { let store = state.store.as_ref(); - let result = publish_into_redact_channel( + let result = redact_from_redis_and_publish( store.get_cache_store().as_ref(), [CacheKind::All(key.into())], ) diff --git a/crates/router/src/core/routing.rs b/crates/router/src/core/routing.rs index 717dfd0c6eba..99bd2b00209b 100644 --- a/crates/router/src/core/routing.rs +++ b/crates/router/src/core/routing.rs @@ -1383,7 +1383,7 @@ pub async fn success_based_routing_update_configs( let cache_entries_to_redact = vec![cache::CacheKind::SuccessBasedDynamicRoutingCache( cache_key.into(), )]; - let _ = cache::publish_into_redact_channel( + let _ = cache::redact_from_redis_and_publish( state.store.get_cache_store().as_ref(), cache_entries_to_redact, ) diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index 0d66c3b6f17b..159def38621a 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -189,7 +189,7 @@ pub async fn update_merchant_active_algorithm_ref( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update routing algorithm ref in merchant account")?; - cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [config_key]) + cache::redact_from_redis_and_publish(db.get_cache_store().as_ref(), [config_key]) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to invalidate the config cache")?; @@ -256,7 +256,7 @@ pub async fn update_profile_active_algorithm_ref( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update routing algorithm ref in business profile")?; - cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [routing_cache_key]) + cache::redact_from_redis_and_publish(db.get_cache_store().as_ref(), [routing_cache_key]) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to invalidate routing cache")?; @@ -1031,7 +1031,7 @@ pub async fn disable_dynamic_routing_algorithm( }; // redact cache for dynamic routing config - let _ = cache::publish_into_redact_channel( + let _ = cache::redact_from_redis_and_publish( state.store.get_cache_store().as_ref(), cache_entries_to_redact, ) diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index 575481793ca1..9b8ab5231b67 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -1,16 +1,13 @@ use diesel_models::configs::ConfigUpdateInternal; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; -use storage_impl::redis::{ - cache::{self, CacheKind, CONFIG_CACHE}, - kv_store::RedisConnInterface, - pub_sub::PubSubInterface, -}; +use storage_impl::redis::cache::{self, CacheKind, CONFIG_CACHE}; use super::{MockDb, Store}; use crate::{ connection, core::errors::{self, CustomResult}, + db::StorageInterface, types::storage, }; @@ -69,14 +66,11 @@ impl ConfigInterface for Store { .await .map_err(|error| report!(errors::StorageError::from(error)))?; - self.get_redis_conn() - .map_err(Into::::into)? - .publish( - cache::IMC_INVALIDATION_CHANNEL, - CacheKind::Config((&inserted.key).into()), - ) - .await - .map_err(Into::::into)?; + cache::redact_from_redis_and_publish( + self.get_cache_store().as_ref(), + [CacheKind::Config((&inserted.key).into())], + ) + .await?; Ok(inserted) } @@ -177,14 +171,11 @@ impl ConfigInterface for Store { .await .map_err(|error| report!(errors::StorageError::from(error)))?; - self.get_redis_conn() - .map_err(Into::::into)? - .publish( - cache::IMC_INVALIDATION_CHANNEL, - CacheKind::Config(key.into()), - ) - .await - .map_err(Into::::into)?; + cache::redact_from_redis_and_publish( + self.get_cache_store().as_ref(), + [CacheKind::Config((&deleted.key).into())], + ) + .await?; Ok(deleted) } diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 4f4a3f1cf00b..1c104b22489f 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -801,7 +801,7 @@ async fn publish_and_redact_merchant_account_cache( cache_keys.extend(publishable_key.into_iter()); cache_keys.extend(cgraph_key.into_iter()); - cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?; + cache::redact_from_redis_and_publish(store.get_cache_store().as_ref(), cache_keys).await?; Ok(()) } @@ -822,6 +822,6 @@ async fn publish_and_redact_all_merchant_account_cache( .map(|s| CacheKind::Accounts(s.into())) .collect(); - cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?; + cache::redact_from_redis_and_publish(store.get_cache_store().as_ref(), cache_keys).await?; Ok(()) } diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index 93255fac9144..323d3d6df259 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -2,14 +2,17 @@ use std::{any::Any, borrow::Cow, fmt::Debug, sync::Arc}; use common_utils::{ errors::{self, CustomResult}, - ext_traits::{AsyncExt, ByteSliceExt}, + ext_traits::ByteSliceExt, }; use dyn_clone::DynClone; use error_stack::{Report, ResultExt}; use moka::future::Cache as MokaCache; use once_cell::sync::Lazy; use redis_interface::{errors::RedisError, RedisConnectionPool, RedisValue}; -use router_env::tracing::{self, instrument}; +use router_env::{ + logger, + tracing::{self, instrument}, +}; use crate::{ errors::StorageError, @@ -100,7 +103,7 @@ pub struct CacheRedact<'a> { pub kind: CacheKind<'a>, } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CacheKind<'a> { Config(Cow<'a, str>), Accounts(Cow<'a, str>), @@ -114,6 +117,23 @@ pub enum CacheKind<'a> { All(Cow<'a, str>), } +impl CacheKind<'_> { + pub(crate) fn get_key_without_prefix(&self) -> &str { + match self { + CacheKind::Config(key) + | CacheKind::Accounts(key) + | CacheKind::Routing(key) + | CacheKind::DecisionManager(key) + | CacheKind::Surcharge(key) + | CacheKind::CGraph(key) + | CacheKind::SuccessBasedDynamicRoutingCache(key) + | CacheKind::EliminationBasedDynamicRoutingCache(key) + | CacheKind::PmFiltersCGraph(key) + | CacheKind::All(key) => key, + } + } +} + impl<'a> TryFrom> for RedisValue { type Error = Report; fn try_from(v: CacheRedact<'a>) -> Result { @@ -343,48 +363,37 @@ where } #[instrument(skip_all)] -pub async fn redact_cache( +pub async fn redact_from_redis_and_publish< + 'a, + K: IntoIterator> + Send + Clone, +>( store: &(dyn RedisConnInterface + Send + Sync), - key: &'static str, - fun: F, - in_memory: Option<&Cache>, -) -> CustomResult -where - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, -{ - let data = fun().await?; - + keys: K, +) -> CustomResult { let redis_conn = store .get_redis_conn() .change_context(StorageError::RedisError( RedisError::RedisConnectionError.into(), )) .attach_printable("Failed to get redis connection")?; - let tenant_key = CacheKey { - key: key.to_string(), - prefix: redis_conn.key_prefix.clone(), - }; - in_memory.async_map(|cache| cache.remove(tenant_key)).await; - redis_conn - .delete_key(key) + let redis_keys_to_be_deleted = keys + .clone() + .into_iter() + .map(|val| val.get_key_without_prefix().to_owned()) + .collect::>(); + + let del_replies = redis_conn + .delete_multiple_keys(&redis_keys_to_be_deleted) .await - .change_context(StorageError::KVError)?; - Ok(data) -} + .map_err(StorageError::RedisError)?; -#[instrument(skip_all)] -pub async fn publish_into_redact_channel<'a, K: IntoIterator> + Send>( - store: &(dyn RedisConnInterface + Send + Sync), - keys: K, -) -> CustomResult { - let redis_conn = store - .get_redis_conn() - .change_context(StorageError::RedisError( - RedisError::RedisConnectionError.into(), - )) - .attach_printable("Failed to get redis connection")?; + let deletion_result = redis_keys_to_be_deleted + .into_iter() + .zip(del_replies) + .collect::>(); + + logger::debug!(redis_deletion_result=?deletion_result); let futures = keys.into_iter().map(|key| async { redis_conn @@ -411,7 +420,7 @@ where Fut: futures::Future> + Send, { let data = fun().await?; - publish_into_redact_channel(store, [key]).await?; + redact_from_redis_and_publish(store, [key]).await?; Ok(data) } @@ -424,10 +433,10 @@ pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>( where F: FnOnce() -> Fut + Send, Fut: futures::Future> + Send, - K: IntoIterator> + Send, + K: IntoIterator> + Send + Clone, { let data = fun().await?; - publish_into_redact_channel(store, keys).await?; + redact_from_redis_and_publish(store, keys).await?; Ok(data) } diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs index 42ad2ae0795a..373ac370e2fe 100644 --- a/crates/storage_impl/src/redis/pub_sub.rs +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -243,11 +243,6 @@ impl PubSubInterface for std::sync::Arc { } }; - self.delete_key(key.as_ref()) - .await - .map_err(|err| logger::error!("Error while deleting redis key: {err:?}")) - .ok(); - logger::debug!( key_prefix=?message.tenant.clone(), channel_name=?channel_name,