Skip to content

Commit

Permalink
refactor(redis): spawn one subscriber thread for handling all the pub…
Browse files Browse the repository at this point in the history
…lished messages to different channel (#5064)
  • Loading branch information
Chethan-rao authored Jun 21, 2024
1 parent 2005d3d commit 6a07e10
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 126 deletions.
6 changes: 5 additions & 1 deletion crates/redis_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl RedisClient {

pub struct SubscriberClient {
inner: fred::clients::SubscriberClient,
pub is_subscriber_handler_spawned: Arc<atomic::AtomicBool>,
}

impl SubscriberClient {
Expand All @@ -83,7 +84,10 @@ impl SubscriberClient {
.wait_for_connect()
.await
.change_context(errors::RedisError::RedisConnectionError)?;
Ok(Self { inner: client })
Ok(Self {
inner: client,
is_subscriber_handler_spawned: Arc::new(atomic::AtomicBool::new(false)),
})
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/router/src/db/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl ConfigInterface for Store {
self.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(
cache::PUB_SUB_CHANNEL,
cache::IMC_INVALIDATION_CHANNEL,
CacheKind::Config((&inserted.key).into()),
)
.await
Expand Down Expand Up @@ -179,7 +179,10 @@ impl ConfigInterface for Store {

self.get_redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(cache::PUB_SUB_CHANNEL, CacheKind::Config(key.into()))
.publish(
cache::IMC_INVALIDATION_CHANNEL,
CacheKind::Config(key.into()),
)
.await
.map_err(Into::<errors::StorageError>::into)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn get_store(
tenant,
master_enc_key,
cache_store,
storage_impl::redis::cache::PUB_SUB_CHANNEL,
storage_impl::redis::cache::IMC_INVALIDATION_CHANNEL,
)
.await?
};
Expand Down
4 changes: 2 additions & 2 deletions crates/storage_impl/src/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
};

/// Redis channel name used for publishing invalidation messages
pub const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";
pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate";

/// Prefix for config cache key
const CONFIG_CACHE_PREFIX: &str = "config";
Expand Down Expand Up @@ -392,7 +392,7 @@ pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a
let futures = keys.into_iter().map(|key| async {
redis_conn
.clone()
.publish(PUB_SUB_CHANNEL, key)
.publish(IMC_INVALIDATION_CHANNEL, key)
.await
.change_context(StorageError::KVError)
});
Expand Down
267 changes: 147 additions & 120 deletions crates/storage_impl/src/redis/pub_sub.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic;

use error_stack::ResultExt;
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
use router_env::{logger, tracing::Instrument};
Expand Down Expand Up @@ -32,15 +34,29 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
.await
.change_context(redis_errors::RedisError::SubscribeError)?;

let redis_clone = self.clone();
let _task_handle = tokio::spawn(
async move {
if let Err(pubsub_error) = redis_clone.on_message().await {
logger::error!(?pubsub_error);
// Spawn only one thread handling all the published messages to different channels
if self
.subscriber
.is_subscriber_handler_spawned
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
{
let redis_clone = self.clone();
let _task_handle = tokio::spawn(
async move {
if let Err(pubsub_error) = redis_clone.on_message().await {
logger::error!(?pubsub_error);
}
}
}
.in_current_span(),
);
.in_current_span(),
);
}

Ok(())
}

Expand All @@ -61,120 +77,131 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
logger::debug!("Started on message: {:?}", self.key_prefix);
let mut rx = self.subscriber.on_message();
while let Ok(message) = rx.recv().await {
logger::debug!("Invalidating {message:?}");
let key = match CacheKind::try_from(RedisValue::new(message.value))
.change_context(redis_errors::RedisError::OnMessageError)
{
Ok(value) => value,
Err(err) => {
logger::error!(value_conversion_err=?err);
continue;
}
};

let key = match key {
CacheKind::Config(key) => {
CONFIG_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Accounts(key) => {
ACCOUNTS_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::CGraph(key) => {
CGRAPH_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Routing(key) => {
ROUTING_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::DecisionManager(key) => {
DECISION_MANAGER_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Surcharge(key) => {
SURCHARGE_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::All(key) => {
CONFIG_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
ACCOUNTS_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
CGRAPH_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
ROUTING_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
DECISION_MANAGER_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
SURCHARGE_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;

key
}
};
let channel_name = message.channel.to_string();
logger::debug!("Received message on channel: {channel_name}");

match channel_name.as_str() {
super::cache::IMC_INVALIDATION_CHANNEL => {
let key = match CacheKind::try_from(RedisValue::new(message.value))
.change_context(redis_errors::RedisError::OnMessageError)
{
Ok(value) => value,
Err(err) => {
logger::error!(value_conversion_err=?err);
continue;
}
};

let key = match key {
CacheKind::Config(key) => {
CONFIG_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Accounts(key) => {
ACCOUNTS_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::CGraph(key) => {
CGRAPH_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Routing(key) => {
ROUTING_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::DecisionManager(key) => {
DECISION_MANAGER_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::Surcharge(key) => {
SURCHARGE_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
key
}
CacheKind::All(key) => {
CONFIG_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
ACCOUNTS_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
CGRAPH_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
ROUTING_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
DECISION_MANAGER_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;
SURCHARGE_CACHE
.remove(CacheKey {
key: key.to_string(),
prefix: self.key_prefix.clone(),
})
.await;

self.delete_key(key.as_ref())
.await
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
.ok();
key
}
};

logger::debug!("Done invalidating {key}");
self.delete_key(key.as_ref())
.await
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
.ok();

logger::debug!(
"Handled message on channel {channel_name} - Done invalidating {key}"
);
}
_ => {
logger::debug!("Received message from unknown channel: {channel_name}");
}
}
}
Ok(())
}
Expand Down

0 comments on commit 6a07e10

Please sign in to comment.