From 4ec2034b6c27d1fc08148543756a12878e2338fb Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Thu, 16 Jan 2025 20:11:15 +0000 Subject: [PATCH] save work Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 631 ++++++++++-------- 1 file changed, 359 insertions(+), 272 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 98f3ee4155..85bceb5aec 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -38,7 +38,7 @@ use crate::{ }, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, - types, FromRedisValue, InfoDict, Pipeline, + FromRedisValue, InfoDict, Pipeline, }; use dashmap::DashMap; use std::{ @@ -640,9 +640,9 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> } } - // Find first specific slot and send to it. There's no need to check If later commands - // should be routed to a different slot, since the server will return an error indicating this. if pipeline.is_atomic() { + // Find first specific slot and send to it. There's no need to check If later commands + // should be routed to a different slot, since the server will return an error indicating this. pipeline .cmd_iter() .map(route_for_command) @@ -664,6 +664,7 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> } }) } else { + // Pipeline is not atomic, so we can have commands with different slots. Ok(None) } } @@ -1075,6 +1076,62 @@ enum ConnectionCheck { RandomConnection, } +/// Represents a pipeline command execution context for a specific node +#[derive(Default)] +struct NodePipelineContext { + /// The pipeline of commands to be executed + pipeline: Pipeline, + /// The connection to the node + connection: C, + /// Vector of (command_index, inner_index) pairs tracking command order + /// command_index: Position in the original pipeline + /// inner_index: Optional sub-index for multi-node operations (e.g. MSET) + command_indices: Vec<(usize, Option)>, +} + +/// Maps node addresses to their pipeline execution contexts +type NodePipelineMap = HashMap>; + +impl NodePipelineContext { + fn new(connection: C) -> Self { + Self { + pipeline: Pipeline::new(), + connection, + command_indices: Vec::new(), + } + } + + // Adds a command to the pipeline and records its index + fn add_command(&mut self, cmd: Cmd, index: usize, inner_index: Option) { + self.pipeline.add_command(cmd); + self.command_indices.push((index, inner_index)); + } +} + +/// Adds a command to the pipeline of the node with the given address +fn add_command_to_node_pipeline( + pipeline_map: &mut NodePipelineMap, + address: String, + connection: C, + cmd: Cmd, + index: usize, + inner_index: Option, +) { + pipeline_map + .entry(address) + .or_insert_with(|| NodePipelineContext::new(connection)) + .add_command(cmd, index, inner_index); +} + +/// Adds a command to a random existing node in the pipeline +fn add_to_random_existing_node(pipeline_map: &mut NodePipelineMap, cmd: Cmd, index: usize) { + let mut rng = rand::thread_rng(); + let addresses: Vec<_> = pipeline_map.keys().cloned().collect(); + let random_address = addresses.choose(&mut rng).unwrap(); + let context = pipeline_map.get_mut(random_address).unwrap(); + context.add_command(cmd, index, None); +} + impl ClusterConnInner where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, @@ -2146,192 +2203,143 @@ where .map(Response::Multiple) .map_err(|err| (OperationTarget::Node { address }, err)) } - #[allow(clippy::type_complexity)] - fn add_command_to_pipeline_map( - pipelines_by_connection: &mut HashMap)>)>, - address: String, - conn: C, - cmd: Cmd, + + // This function handles commands with routing info of MultiSlot (like MSET or MGET), creates sub-commands for the matching slots and add it to the correct pipeline + async fn handle_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, index: usize, - inner_index: Option, + slots: Vec<(Route, Vec)>, ) { - pipelines_by_connection - .entry(address.clone()) - .or_insert_with(|| (Pipeline::new(), conn.clone(), Vec::new())) - .0 - .add_command(cmd); - pipelines_by_connection - .entry(address) - .or_insert_with(|| (Pipeline::new(), conn, Vec::new())) - .2 - .push((index, inner_index)); - } - - async fn routes_pipeline_commands( + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + let new_cmd = + crate::cluster_routing::command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } + } + } + + fn determine_internal_routing( + routing_info: SingleNodeRoutingInfo, + ) -> InternalSingleNodeRouting { + match routing_info { + SingleNodeRoutingInfo::Random => SingleNodeRoutingInfo::Random.into(), + SingleNodeRoutingInfo::SpecificNode(route) => { + let route_single: SingleNodeRoutingInfo = Some(route).into(); + route_single.into() + } + SingleNodeRoutingInfo::RandomPrimary => { + let route_single: SingleNodeRoutingInfo = Some(Route::new_random_primary()).into(); + route_single.into() + } + SingleNodeRoutingInfo::ByAddress { host, port } => { + let address = format!("{host}:{port}"); + InternalSingleNodeRouting::ByAddress(address) + } + } + } + + // This function handles commands with routing info of SingleNode + async fn handle_single_node_route( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, + ) -> Result<(), (OperationTarget, RedisError)> { + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline + add_to_random_existing_node(pipeline_map, cmd, index); + Ok(()) + } else { + let (address, conn) = Self::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline(pipeline_map, address, conn, cmd, index, None); + Ok(()) + } + } + + /// Maps the commands in a pipeline to the appropriate nodes based on their routing information. + /// + /// This function processes each command in the given pipeline, determines its routing information, + /// and organizes it into a map of node pipelines. It handles both single-node and multi-node routing + /// strategies and ensures that the commands are distributed accordingly. + /// + /// It also collects response policies for multi-node routing and returns them along with the pipeline map. + /// This is to ensure we can aggregate responses from properly from the different nodes. + /// + /// # Arguments + /// + /// * `pipeline` - A reference to the pipeline containing the commands to route. + /// * `core` - The core object that provides access to connection locks and other resources. + /// + /// # Returns + /// + /// A `RedisResult` containing a tuple: + /// + /// - A `NodePipelineMap` where commands are grouped by their corresponding nodes. + /// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information + /// and response policies for multi-node commands, along with the index of the command in the pipeline. + /// + async fn map_pipeline_to_nodes( pipeline: &crate::Pipeline, core: Core, ) -> RedisResult<( - HashMap)>)>, + NodePipelineMap, Vec<(usize, MultipleNodeRoutingInfo, Option)>, )> { - #[allow(clippy::type_complexity)] - let mut pipelines_by_connection: HashMap< - String, - (Pipeline, C, Vec<(usize, Option)>), - > = HashMap::new(); + let mut pipelines_by_connection = NodePipelineMap::new(); let mut response_policies = Vec::new(); for (index, cmd) in pipeline.cmd_iter().enumerate() { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) - | None => { - if pipelines_by_connection.is_empty() { - let conn = crate::cluster_async::ClusterConnInner::get_connection( - SingleNodeRoutingInfo::Random.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } else { - // since the map is not empty, add the command to a random connection within the map. - let mut rng = rand::thread_rng(); - let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); - let random_key = keys.choose(&mut rng).unwrap(); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .0 - .add_command(cmd.clone()); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .2 - .push((index, None)); - } - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => { - let route_single: SingleNodeRoutingInfo = Some(route).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( + match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or( + cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) { + cluster_routing::RoutingInfo::SingleNode(route) => { + let route = Self::determine_internal_routing(route); + Self::handle_single_node_route( &mut pipelines_by_connection, - address, - conn, cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => { - let route_single: SingleNodeRoutingInfo = - Some(Route::new_random_primary()).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), + route, core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::ByAddress { host, port }, - )) => { - let address = format!("{host}:{port}"); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.clone()), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); + ) + .await + .map_err(|(_target, err)| err)?; } - Some(cluster_routing::RoutingInfo::MultiNode(( - multi_node_routing, - response_policy, - ))) => { + + cluster_routing::RoutingInfo::MultiNode((multi_node_routing, response_policy)) => { // save the routing info and response policy, so we will be able to aggregate the results later response_policies.push((index, multi_node_routing.clone(), response_policy)); match multi_node_routing { - MultipleNodeRoutingInfo::AllNodes => { - let all_nodes: Vec<_> = { + MultipleNodeRoutingInfo::AllNodes | MultipleNodeRoutingInfo::AllMasters => { + let connections: Vec<_> = { let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_node_connections().collect() - }; - for (inner_index, (address, conn)) in all_nodes.into_iter().enumerate() - { - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - cmd.clone(), - index, - Some(inner_index), - ); - } - } - MultipleNodeRoutingInfo::AllMasters => { - let all_primaries: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_primary_connections().collect() + if matches!(multi_node_routing, MultipleNodeRoutingInfo::AllNodes) { + lock.all_node_connections().collect() + } else { + lock.all_primary_connections().collect() + } }; for (inner_index, (address, conn)) in - all_primaries.into_iter().enumerate() + connections.into_iter().enumerate() { - Self::add_command_to_pipeline_map( + add_command_to_node_pipeline( &mut pipelines_by_connection, address, conn.await, @@ -2342,27 +2350,14 @@ where } } MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { - for (inner_index, (route, indices)) in slots.iter().enumerate() { - let conn = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.connection_for_route(route) - }; - if let Some((address, conn)) = conn { - let new_cmd = - crate::cluster_routing::command_for_multi_slot_indices( - cmd, - indices.iter(), - ); - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - new_cmd, - index, - Some(inner_index), - ); - } - } + Self::handle_multi_slot_routing( + &mut pipelines_by_connection, + core.clone(), + cmd, + index, + slots, + ) + .await; } } } @@ -2370,6 +2365,158 @@ where } Ok((pipelines_by_connection, response_policies)) } + + /// Executes a pipeline of commands on a specified node + async fn execute_pipeline_on_node( + address: String, + node_context: NodePipelineContext, + ) -> Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)> + { + let count = node_context.pipeline.len(); + let result = Self::try_pipeline_request(Arc::new(node_context.pipeline), 0, count, async { + Ok((address.clone(), node_context.connection)) + }) + .await?; + + match result { + Response::Multiple(values) => Ok((node_context.command_indices, values, address)), + _ => Err(( + OperationTarget::FanOut, + RedisError::from((ErrorKind::ResponseError, "Unsupported response type")), + )), + } + } + + /// Adds the result of a pipeline command to the `values_and_addresses` collection. + /// + /// This function updates the `values_and_addresses` vector at the given `index` and optionally at the + /// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough + /// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, + /// the value and address are simply appended to the vector. + /// + /// # Parameters + /// - `values_and_addresses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. + /// - `index`: The index in `values_and_addresses` where the result should be stored. + /// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. + /// - `value`: The result value to store. + /// - `address`: The address associated with the result. + fn add_pipeline_result( + values_and_addresses: &mut [Vec<(Value, String)>], + index: usize, + inner_index: Option, + value: Value, + address: String, + ) { + match inner_index { + Some(inner_index) => { + // Ensure the vector at the given index is large enough to hold the value and address at the specified position + if values_and_addresses[index].len() <= inner_index { + values_and_addresses[index] + .resize(inner_index + 1, (Value::Nil, "".to_string())); + } + values_and_addresses[index][inner_index] = (value, address); + } + None => values_and_addresses[index].push((value, address)), + } + } + + /// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. + /// + /// This function processes the provided `response_policies`, which contain information about how responses + /// from multiple nodes should be aggregated. For each policy: + /// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. + async fn aggregate_multi_node_commands( + values_and_addresses: &mut [Vec<(Value, String)>], + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + for (index, routing_info, response_policy) in response_policies { + let response_receivers = values_and_addresses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; + } + Ok(()) + } + + /// Collects and processes the results of pipeline tasks from a `tokio::task::JoinSet`. + /// + /// This function iteratively retrieves completed tasks from the provided `join_set` and processes + /// their results. Successful results are added to the `values_and_addresses` vector using the + /// indices and values provided. If an error occurs in any task, it is recorded and returned as + /// the first encountered error. + /// + /// # Parameters + /// - `join_set`: A mutable reference to a `tokio::task::JoinSet` containing tasks that return: + /// - `Ok((Vec<(usize, Option)>, Vec, String))`: On success, a tuple of: + /// - A list of indices and optional inner indices corresponding to pipeline commands. + /// - A list of `Value` results. + /// - The `String` address where the task was executed. + /// - `Err((OperationTarget, RedisError))`: On failure, an error detailing the operation target and the Redis error. + /// - `values_and_addresses`: A mutable slice of vectors, where each vector corresponds to a pipeline + /// command's results. This is updated with the values and addresses from successful tasks. + /// + /// # Returns + /// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error. + /// - `Ok(None)`: If all tasks completed successfully. + /// - `Err((OperationTarget::FanOut, RedisError))`: If a task failed unexpectedly (e.g., due to a panic). + /// + /// + /// # Behavior + /// - Processes successful results by calling `Self::add_pipeline_result` to update the + /// `values_and_addresses` vector with the indices, values, and addresses. + /// - Records the first error encountered and continues processing the remaining tasks. + /// - Returns `Ok(None)` if all tasks complete successfully. + #[allow(clippy::type_complexity)] + async fn collect_pipeline_tasks( + join_set: &mut tokio::task::JoinSet< + Result< + (Vec<(usize, Option)>, Vec, String), + (OperationTarget, RedisError), + >, + >, + values_and_addresses: &mut [Vec<(Value, String)>], + ) -> Result, (OperationTarget, RedisError)> { + let mut first_error = None; + + while let Some(future_result) = join_set.join_next().await { + match future_result { + Ok(Ok((indices, values, address))) => { + for ((index, inner_index), value) in indices.into_iter().zip(values) { + Self::add_pipeline_result( + values_and_addresses, + index, + inner_index, + value, + address.clone(), + ); + } + } + Ok(Err(e)) => first_error = first_error.or(Some(e)), + Err(e) => { + return Err(( + OperationTarget::FanOut, + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()).into(), + )) + } + } + } + Ok(first_error) + } + async fn try_request(info: RequestInfo, core: Core) -> OperationResult { match info.cmd { CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await, @@ -2388,74 +2535,35 @@ where ) .await } else { + // Distribute pipeline commands across cluster nodes based on routing rules. + // Returns: + // - pipelines_by_connection: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node operations let (pipelines_by_connection, response_policies) = - Self::routes_pipeline_commands(&pipeline, core.clone()) + Self::map_pipeline_to_nodes(&pipeline, core.clone()) .await .map_err(|err| (OperationTarget::FanOut, err))?; + // Stores responses along with their source node addresses for each pipeline command. + // + // The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. + // Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command + // might produce multiple responses, each from a different node. By storing the responses with their + // respective node addresses, we ensure that we have all the information needed to aggregate the results later. + // This structure is essential for handling scenarios where responses from multiple nodes must be combined. let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; - let mut first_error = None; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks - for (address, (pipeline, conn, indices)) in pipelines_by_connection { - // Spawn the async task - join_set.spawn(async move { - let count = pipeline.len(); - let result = - Self::try_pipeline_request(Arc::new(pipeline), 0, count, async { - Ok((address.clone(), conn)) - }) - .await?; - match result { - Response::Multiple(values) => Ok((indices, values, address)), - _ => Err(( - OperationTarget::FanOut, - RedisError::from(( - ErrorKind::ResponseError, - "Unsupported response type", - )), - )), - } - }); + for (address, node_context) in pipelines_by_connection { + // Spawn a new task to execute the pipeline on the node + join_set.spawn(Self::execute_pipeline_on_node(address, node_context)); } // Wait for all spawned tasks to complete - while let Some(future_result) = join_set.join_next().await { - match future_result { - Err(e) => { - return Err(( - OperationTarget::FanOut, - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - .into(), - )); - } - Ok(Ok((indices, values, address))) => { - for ((index, inner_index), value) in indices.into_iter().zip(values) - { - if let Some(inner_index) = inner_index { - // Ensure the vector is big enough to hold `inner_index` - if values_and_addresses[index].len() <= inner_index { - values_and_addresses[index].resize( - inner_index + 1, - (Value::Nil, "".to_string()), - ); - } - // Add the value to the specific inner_index within index - values_and_addresses[index][inner_index] = - (value, address.clone()); - } else { - values_and_addresses[index].push((value, address.clone())); - } - } - } - Ok(Err(e)) => { - if first_error.is_none() { - first_error = Some(e); - } - } - } - } + let first_error = + Self::collect_pipeline_tasks(&mut join_set, &mut values_and_addresses) + .await?; // Check for errors if let Some(first_error) = first_error { @@ -2463,36 +2571,15 @@ where } // Process response policies after all tasks are complete - for (index, routing_info, response_policy) in response_policies { - #[allow(clippy::type_complexity)] - // Safely access `values_and_addresses` for the current index - let response_receivers: Vec<( - Option, - oneshot::Receiver>, - )> = values_and_addresses[index] - .iter() - .map(|(value, address)| { - let (sender, receiver) = oneshot::channel(); - let _ = sender.send(Ok(Response::Single(value.clone()))); - (Some(address.clone()), receiver) - }) - .collect(); - - let aggregated_response = Self::aggregate_results( - response_receivers, - &routing_info, - response_policy, - ) - .await - .map_err(|err| (OperationTarget::FanOut, err))?; - - // Update `values_and_addresses` for the current index - values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; - } + Self::aggregate_multi_node_commands( + &mut values_and_addresses, + response_policies, + ) + .await?; // Collect final responses for mut value in values_and_addresses.into_iter() { - assert_eq!(value.len(), 1); + // unwrap() is safe here because we know that the vector is not empty final_responses.push(value.pop().unwrap().0); }