Skip to content

Commit

Permalink
Core: Allow creation of RESP3 connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Dec 11, 2023
1 parent dd626e4 commit b15da06
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 8 deletions.
9 changes: 6 additions & 3 deletions babushka-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@ fn chars_to_string_option(chars: &::protobuf::Chars) -> Option<String> {
pub(super) fn get_redis_connection_info(
authentication_info: Option<Box<AuthenticationInfo>>,
database_id: u32,
use_resp3: bool,
) -> redis::RedisConnectionInfo {
match authentication_info {
Some(info) => redis::RedisConnectionInfo {
db: database_id as i64,
username: chars_to_string_option(&info.username),
password: chars_to_string_option(&info.password),
use_resp3: false,
use_resp3,
},
None => redis::RedisConnectionInfo {
db: database_id as i64,
use_resp3: false,
use_resp3,
..Default::default()
},
}
Expand Down Expand Up @@ -170,7 +171,8 @@ async fn create_cluster_client(
) -> RedisResult<redis::cluster_async::ClusterConnection> {
// TODO - implement timeout for each connection attempt
let tls_mode = request.tls_mode.enum_value_or_default();
let redis_connection_info = get_redis_connection_info(request.authentication_info.0, 0);
let redis_connection_info =
get_redis_connection_info(request.authentication_info.0, 0, request.use_resp3);
let initial_nodes: Vec<_> = request
.addresses
.into_iter()
Expand All @@ -183,6 +185,7 @@ async fn create_cluster_client(
if read_from_replicas {
builder = builder.read_from_replicas();
}
builder = builder.use_resp3(request.use_resp3);
if tls_mode != TlsMode::NoTls {
let tls = if tls_mode == TlsMode::SecureTls {
redis::cluster::TlsMode::Secure
Expand Down
1 change: 1 addition & 0 deletions babushka-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl StandaloneClient {
let redis_connection_info = get_redis_connection_info(
connection_request.authentication_info.0,
connection_request.database_id,
connection_request.use_resp3,
);

let tls_mode = connection_request.tls_mode.enum_value_or_default();
Expand Down
1 change: 1 addition & 0 deletions babushka-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message ConnectionRequest {
ConnectionRetryStrategy connection_retry_strategy = 6;
AuthenticationInfo authentication_info = 7;
uint32 database_id = 8;
bool use_resp3 = 9;
}

message ConnectionRetryStrategy {
Expand Down
67 changes: 67 additions & 0 deletions babushka-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ mod utilities;
mod shared_client_tests {
use super::*;
use babushka::client::Client;
use redis::RedisConnectionInfo;
use redis::Value;
use rstest::rstest;
use utilities::cluster::*;
use utilities::*;
Expand Down Expand Up @@ -74,6 +76,71 @@ mod shared_client_tests {
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_resp3_support(#[values(false, true)] use_cluster: bool) {
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
shared_server: true,
connection_info: Some(RedisConnectionInfo {
use_resp3: true,
..Default::default()
}),
..Default::default()
},
)
.await;
let hello: std::collections::HashMap<String, Value> = redis::from_redis_value(
&test_basics
.client
.req_packed_command(&redis::cmd("HELLO"), None)
.await
.unwrap(),
)
.unwrap();
assert_eq!(hello.get("proto").unwrap(), &Value::Int(3));

let mut cmd = redis::cmd("HSET");
cmd.arg("hash").arg("foo").arg("baz");
test_basics
.client
.req_packed_command(&cmd, None)
.await
.unwrap();
let mut cmd = redis::cmd("HSET");
cmd.arg("hash").arg("bar").arg("foobar");
test_basics
.client
.req_packed_command(&cmd, None)
.await
.unwrap();

let mut cmd = redis::cmd("HGETALL");
cmd.arg("hash");
let result = test_basics
.client
.req_packed_command(&cmd, None)
.await
.unwrap();

assert_eq!(
result,
Value::Map(vec![
(
Value::BulkString("foo".as_bytes().to_vec()),
Value::BulkString("baz".as_bytes().to_vec())
),
(
Value::BulkString("bar".as_bytes().to_vec()),
Value::BulkString("foobar".as_bytes().to_vec())
)
])
);
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_client_handle_concurrent_workload(
Expand Down
6 changes: 4 additions & 2 deletions babushka-core/tests/utilities/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ pub async fn setup_test_basics_internal(mut configuration: TestConfiguration) ->
};

if let Some(redis_connection_info) = &configuration.connection_info {
assert!(!configuration.shared_server);
setup_acl_for_cluster(&addresses, redis_connection_info).await;
if redis_connection_info.password.is_some() {
assert!(!configuration.shared_server);
setup_acl_for_cluster(&addresses, redis_connection_info).await;
}
}
configuration.cluster_mode = ClusterMode::Enabled;
configuration.request_timeout = configuration.request_timeout.or(Some(10000));
Expand Down
7 changes: 5 additions & 2 deletions babushka-core/tests/utilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ fn set_connection_info_to_connection_request(
connection_info: RedisConnectionInfo,
connection_request: &mut connection_request::ConnectionRequest,
) {
connection_request.use_resp3 = connection_info.use_resp3;
if connection_info.password.is_some() {
connection_request.authentication_info =
protobuf::MessageField(Some(Box::new(AuthenticationInfo {
Expand Down Expand Up @@ -660,8 +661,10 @@ pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration
};

if let Some(redis_connection_info) = &configuration.connection_info {
assert!(!configuration.shared_server);
setup_acl(&connection_addr, redis_connection_info).await;
if redis_connection_info.password.is_some() {
assert!(!configuration.shared_server);
setup_acl(&connection_addr, redis_connection_info).await;
}
}
let mut connection_request = create_connection_request(&[connection_addr], configuration);
connection_request.cluster_mode_enabled = false;
Expand Down

0 comments on commit b15da06

Please sign in to comment.