Skip to content

Commit

Permalink
Core: add az awareness to read strategy (valkey-io#2539)
Browse files Browse the repository at this point in the history
* redis-core: add az awareness to read strategy

* remove unessecrry get and set and fix tests

* remove unessecrry az return type

* add version check with mutex to skip tests in runtime

Signed-off-by: Adar Ovadia <[email protected]>

---------

Signed-off-by: Adar Ovadia <[email protected]>
Co-authored-by: Adar Ovadia <[email protected]>
  • Loading branch information
adarovadya and Adar Ovadia authored Nov 18, 2024
1 parent de7892c commit ddd98cc
Show file tree
Hide file tree
Showing 26 changed files with 871 additions and 95 deletions.
1 change: 1 addition & 0 deletions .github/workflows/redis-rs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
working-directory: ./glide-core/redis-rs/redis

- name: Test
# TODO remove the concurrency limit after we fix test flakyness.
run: |
cargo test --release -- --test-threads=1 | tee ../test-results.xml
echo "### Tests passed :v:" >> $GITHUB_STEP_SUMMARY
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@
* Node: Add `JSON.STRLEN` and `JSON.STRAPPEND` command ([#2537](https://github.com/valkey-io/valkey-glide/pull/2537))
* Node: Add `FT.SEARCH` ([#2551](https://github.com/valkey-io/valkey-glide/pull/2551))
* Python: Fix example ([#2556](https://github.com/valkey-io/valkey-glide/issues/2556))
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587]https://github.com/valkey-io/valkey-glide/pull/2587)
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587](https://github.com/valkey-io/valkey-glide/pull/2587))
* Node: Add `JSON.DEBUG` command ([#2572](https://github.com/valkey-io/valkey-glide/pull/2572))
* Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555))
* Core: Add support to Availability Zone Affinity read strategy ([#2539](https://github.com/valkey-io/valkey-glide/pull/2539))
* Core: Fix list of readonly commands ([#2634](https://github.com/valkey-io/valkey-glide/pull/2634), [#2649](https://github.com/valkey-io/valkey-glide/pull/2649))
* Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643))
* Core: Release the read lock while creating connections in `refresh_connections` ([#2630](https://github.com/valkey-io/valkey-glide/issues/2630))
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ uuid = { version = "1.6.1", optional = true }

telemetrylib = { path = "../../telemetry" }

lazy_static = "1"

[features]
default = [
"acl",
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
pubsub: false,
protocol: connection_info.protocol,
};
setup_connection(connection_info, &mut rv).await?;
setup_connection(connection_info, &mut rv, false).await?;
Ok(rv)
}

Expand Down
51 changes: 49 additions & 2 deletions glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::cmd::{cmd, Cmd};
use crate::connection::{
get_resp3_hello_command_error, PubSubSubscriptionKind, RedisConnectionInfo,
};
use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value};
use crate::types::{
ErrorKind, FromRedisValue, InfoDict, ProtocolVersion, RedisError, RedisFuture, RedisResult,
Value,
};
use crate::PushKind;
use ::tokio::io::{AsyncRead, AsyncWrite};
use async_trait::async_trait;
Expand Down Expand Up @@ -84,6 +87,14 @@ pub trait ConnectionLike {

/// Returns the state of the connection
fn is_closed(&self) -> bool;

/// Get the connection availibility zone
fn get_az(&self) -> Option<String> {
None
}

/// Set the connection availibility zone
fn set_az(&mut self, _az: Option<String>) {}
}

/// Implements ability to notify about disconnection events
Expand All @@ -105,8 +116,40 @@ impl Clone for Box<dyn DisconnectNotifier> {
}
}

// Helper function to extract and update availability zone from INFO command
async fn update_az_from_info<C>(con: &mut C) -> RedisResult<()>
where
C: ConnectionLike,
{
let info_res = con.req_packed_command(&cmd("INFO")).await;

match info_res {
Ok(value) => {
let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?;
if let Some(node_az) = info_dict.get::<String>("availability_zone") {
con.set_az(Some(node_az));
}
Ok(())
}
Err(e) => {
// Handle the error case for the INFO command
Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to execute INFO command. ",
format!("{:?}", e),
)))
}
}
}

// Initial setup for every connection.
async fn setup_connection<C>(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()>
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
where
C: ConnectionLike,
{
Expand Down Expand Up @@ -181,6 +224,10 @@ where
}
}

if discover_az {
update_az_from_info(con).await?;
}

// result is ignored, as per the command's instructions.
// https://redis.io/commands/client-setinfo/
#[cfg(not(feature = "disable-client-setinfo"))]
Expand Down
33 changes: 32 additions & 1 deletion glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct MultiplexedConnection {
response_timeout: Duration,
protocol: ProtocolVersion,
push_manager: PushManager,
availability_zone: Option<String>,
password: Option<String>,
}

Expand Down Expand Up @@ -479,11 +480,16 @@ impl MultiplexedConnection {
.with_push_manager(pm)
.with_protocol(connection_info.redis.protocol)
.with_password(connection_info.redis.password.clone())
.with_availability_zone(None)
.build()
.await?;

let driver = {
let auth = setup_connection(&connection_info.redis, &mut con);
let auth = setup_connection(
&connection_info.redis,
&mut con,
glide_connection_options.discover_az,
);

futures_util::pin_mut!(auth);

Expand Down Expand Up @@ -575,6 +581,11 @@ impl MultiplexedConnection {
self.pipeline.set_push_manager(push_manager).await;
}

/// For external visibilty (glide-core)
pub fn get_availability_zone(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Replace the password used to authenticate with the server.
/// If `None` is provided, the password will be removed.
pub async fn update_connection_password(
Expand All @@ -599,6 +610,8 @@ pub struct MultiplexedConnectionBuilder {
push_manager: Option<PushManager>,
protocol: Option<ProtocolVersion>,
password: Option<String>,
/// Represents the node's availability zone
availability_zone: Option<String>,
}

impl MultiplexedConnectionBuilder {
Expand All @@ -611,6 +624,7 @@ impl MultiplexedConnectionBuilder {
push_manager: None,
protocol: None,
password: None,
availability_zone: None,
}
}

Expand Down Expand Up @@ -644,6 +658,12 @@ impl MultiplexedConnectionBuilder {
self
}

/// Sets the avazilability zone for the `MultiplexedConnectionBuilder`.
pub fn with_availability_zone(mut self, az: Option<String>) -> Self {
self.availability_zone = az;
self
}

/// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
pub async fn build(self) -> RedisResult<MultiplexedConnection> {
let db = self.db.unwrap_or_default();
Expand All @@ -661,6 +681,7 @@ impl MultiplexedConnectionBuilder {
push_manager,
protocol,
password,
availability_zone: self.availability_zone,
};

Ok(con)
Expand Down Expand Up @@ -688,6 +709,16 @@ impl ConnectionLike for MultiplexedConnection {
fn is_closed(&self) -> bool {
self.pipeline.is_closed()
}

/// Get the node's availability zone
fn get_az(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Set the node's availability zone
fn set_az(&mut self, az: Option<String>) {
self.availability_zone = az;
}
}
impl MultiplexedConnection {
/// Subscribes to a new channel.
Expand Down
5 changes: 4 additions & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down Expand Up @@ -164,7 +167,7 @@ impl Client {
/// For Unix connections, returns (async connection, None)
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub async fn get_multiplexed_async_connection_and_ip(
pub async fn get_multiplexed_async_connection_ip(
&self,
glide_connection_options: GlideConnectionOptions,
) -> RedisResult<(crate::aio::MultiplexedConnection, Option<IpAddr>)> {
Expand Down
7 changes: 5 additions & 2 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ where
) -> RedisResult<Self> {
let connection = Self {
connections: RefCell::new(HashMap::new()),
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
slots: RefCell::new(SlotMap::new(
vec![],
cluster_params.read_from_replicas.clone(),
)),
auto_reconnect: RefCell::new(true),
cluster_params,
read_timeout: RefCell::new(None),
Expand Down Expand Up @@ -384,7 +387,7 @@ where
"can't parse node address",
)))?;
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(|slots_data| {
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas)
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas.clone())
}) {
Ok(new_slots) => {
result = Ok(new_slots);
Expand Down
Loading

0 comments on commit ddd98cc

Please sign in to comment.