diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index dc4fd3bbc9cc..fa35039800bb 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -68,6 +68,7 @@ impl RedisClient { pub struct SubscriberClient { inner: fred::clients::SubscriberClient, + pub is_subscriber_handler_spawned: Arc, } impl SubscriberClient { @@ -83,7 +84,10 @@ impl SubscriberClient { .wait_for_connect() .await .change_context(errors::RedisError::RedisConnectionError)?; - Ok(Self { inner: client }) + Ok(Self { + inner: client, + is_subscriber_handler_spawned: Arc::new(atomic::AtomicBool::new(false)), + }) } } diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index fb399a7084f6..575481793ca1 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -72,7 +72,7 @@ impl ConfigInterface for Store { self.get_redis_conn() .map_err(Into::::into)? .publish( - cache::PUB_SUB_CHANNEL, + cache::IMC_INVALIDATION_CHANNEL, CacheKind::Config((&inserted.key).into()), ) .await @@ -179,7 +179,10 @@ impl ConfigInterface for Store { self.get_redis_conn() .map_err(Into::::into)? - .publish(cache::PUB_SUB_CHANNEL, CacheKind::Config(key.into())) + .publish( + cache::IMC_INVALIDATION_CHANNEL, + CacheKind::Config(key.into()), + ) .await .map_err(Into::::into)?; diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 58b5b56fd218..38f859c48e79 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -74,7 +74,7 @@ pub async fn get_store( tenant, master_enc_key, cache_store, - storage_impl::redis::cache::PUB_SUB_CHANNEL, + storage_impl::redis::cache::IMC_INVALIDATION_CHANNEL, ) .await? }; diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index 46aa3f8aa962..036a8902bc38 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -21,7 +21,7 @@ use crate::{ }; /// Redis channel name used for publishing invalidation messages -pub const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate"; +pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate"; /// Prefix for config cache key const CONFIG_CACHE_PREFIX: &str = "config"; @@ -392,7 +392,7 @@ pub async fn publish_into_redact_channel<'a, K: IntoIterator { .await .change_context(redis_errors::RedisError::SubscribeError)?; - let redis_clone = self.clone(); - let _task_handle = tokio::spawn( - async move { - if let Err(pubsub_error) = redis_clone.on_message().await { - logger::error!(?pubsub_error); + // Spawn only one thread handling all the published messages to different channels + if self + .subscriber + .is_subscriber_handler_spawned + .compare_exchange( + false, + true, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) + .is_ok() + { + let redis_clone = self.clone(); + let _task_handle = tokio::spawn( + async move { + if let Err(pubsub_error) = redis_clone.on_message().await { + logger::error!(?pubsub_error); + } } - } - .in_current_span(), - ); + .in_current_span(), + ); + } + Ok(()) } @@ -61,120 +77,131 @@ impl PubSubInterface for std::sync::Arc { logger::debug!("Started on message: {:?}", self.key_prefix); let mut rx = self.subscriber.on_message(); while let Ok(message) = rx.recv().await { - logger::debug!("Invalidating {message:?}"); - let key = match CacheKind::try_from(RedisValue::new(message.value)) - .change_context(redis_errors::RedisError::OnMessageError) - { - Ok(value) => value, - Err(err) => { - logger::error!(value_conversion_err=?err); - continue; - } - }; - - let key = match key { - CacheKind::Config(key) => { - CONFIG_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::Accounts(key) => { - ACCOUNTS_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::CGraph(key) => { - CGRAPH_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::Routing(key) => { - ROUTING_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::DecisionManager(key) => { - DECISION_MANAGER_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::Surcharge(key) => { - SURCHARGE_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - key - } - CacheKind::All(key) => { - CONFIG_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - ACCOUNTS_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - CGRAPH_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - ROUTING_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - DECISION_MANAGER_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - SURCHARGE_CACHE - .remove(CacheKey { - key: key.to_string(), - prefix: self.key_prefix.clone(), - }) - .await; - - key - } - }; + let channel_name = message.channel.to_string(); + logger::debug!("Received message on channel: {channel_name}"); + + match channel_name.as_str() { + super::cache::IMC_INVALIDATION_CHANNEL => { + let key = match CacheKind::try_from(RedisValue::new(message.value)) + .change_context(redis_errors::RedisError::OnMessageError) + { + Ok(value) => value, + Err(err) => { + logger::error!(value_conversion_err=?err); + continue; + } + }; + + let key = match key { + CacheKind::Config(key) => { + CONFIG_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::Accounts(key) => { + ACCOUNTS_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::CGraph(key) => { + CGRAPH_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::Routing(key) => { + ROUTING_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::DecisionManager(key) => { + DECISION_MANAGER_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::Surcharge(key) => { + SURCHARGE_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + key + } + CacheKind::All(key) => { + CONFIG_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + ACCOUNTS_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + CGRAPH_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + ROUTING_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + DECISION_MANAGER_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; + SURCHARGE_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: self.key_prefix.clone(), + }) + .await; - self.delete_key(key.as_ref()) - .await - .map_err(|err| logger::error!("Error while deleting redis key: {err:?}")) - .ok(); + key + } + }; - logger::debug!("Done invalidating {key}"); + self.delete_key(key.as_ref()) + .await + .map_err(|err| logger::error!("Error while deleting redis key: {err:?}")) + .ok(); + + logger::debug!( + "Handled message on channel {channel_name} - Done invalidating {key}" + ); + } + _ => { + logger::debug!("Received message from unknown channel: {channel_name}"); + } + } } Ok(()) }