diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 98f3ee4155..3e13d0e4f6 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -23,6 +23,8 @@ //! ``` mod connections_container; +mod pipeline_routing; + mod connections_logic; /// Exposed only for testing. pub mod testing { @@ -38,9 +40,12 @@ use crate::{ }, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, - types, FromRedisValue, InfoDict, Pipeline, + FromRedisValue, InfoDict, }; use dashmap::DashMap; +use pipeline_routing::{ + collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, route_for_pipeline, +}; use std::{ collections::{HashMap, HashSet}, fmt, io, mem, @@ -93,7 +98,6 @@ use tokio::{sync::Notify, time::timeout}; use dispose::{Disposable, Dispose}; use futures::{future::BoxFuture, prelude::*, ready}; use pin_project_lite::pin_project; -use rand::seq::SliceRandom; use std::sync::RwLock as StdRwLock; use tokio::sync::{ mpsc, @@ -622,51 +626,6 @@ enum Operation { UpdateConnectionPassword(Option), } -fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { - fn route_for_command(cmd: &Cmd) -> Option { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => Some(route), - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => Some(Route::new_random_primary()), - Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { - .. - })) => None, - None => None, - } - } - - // Find first specific slot and send to it. There's no need to check If later commands - // should be routed to a different slot, since the server will return an error indicating this. - if pipeline.is_atomic() { - pipeline - .cmd_iter() - .map(route_for_command) - .try_fold(None, |chosen_route, next_cmd_route| { - match (chosen_route, next_cmd_route) { - (None, _) => Ok(next_cmd_route), - (_, None) => Ok(chosen_route), - (Some(chosen_route), Some(next_cmd_route)) => { - if chosen_route.slot() != next_cmd_route.slot() { - Err(( - ErrorKind::CrossSlot, - "Received crossed slots in transaction", - ) - .into()) - } else { - Ok(Some(chosen_route)) - } - } - } - }) - } else { - Ok(None) - } -} fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { Box::pin(tokio::time::sleep(duration)) } @@ -2133,7 +2092,7 @@ where .map_err(|err| (address.into(), err)) } - async fn try_pipeline_request( + pub async fn try_pipeline_request( pipeline: Arc, offset: usize, count: usize, @@ -2146,230 +2105,39 @@ where .map(Response::Multiple) .map_err(|err| (OperationTarget::Node { address }, err)) } - #[allow(clippy::type_complexity)] - fn add_command_to_pipeline_map( - pipelines_by_connection: &mut HashMap)>)>, - address: String, - conn: C, - cmd: Cmd, - index: usize, - inner_index: Option, - ) { - pipelines_by_connection - .entry(address.clone()) - .or_insert_with(|| (Pipeline::new(), conn.clone(), Vec::new())) - .0 - .add_command(cmd); - pipelines_by_connection - .entry(address) - .or_insert_with(|| (Pipeline::new(), conn, Vec::new())) - .2 - .push((index, inner_index)); - } - - async fn routes_pipeline_commands( - pipeline: &crate::Pipeline, - core: Core, - ) -> RedisResult<( - HashMap)>)>, - Vec<(usize, MultipleNodeRoutingInfo, Option)>, - )> { - #[allow(clippy::type_complexity)] - let mut pipelines_by_connection: HashMap< - String, - (Pipeline, C, Vec<(usize, Option)>), - > = HashMap::new(); - let mut response_policies = Vec::new(); - - for (index, cmd) in pipeline.cmd_iter().enumerate() { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) - | None => { - if pipelines_by_connection.is_empty() { - let conn = crate::cluster_async::ClusterConnInner::get_connection( - SingleNodeRoutingInfo::Random.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } else { - // since the map is not empty, add the command to a random connection within the map. - let mut rng = rand::thread_rng(); - let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); - let random_key = keys.choose(&mut rng).unwrap(); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .0 - .add_command(cmd.clone()); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .2 - .push((index, None)); - } - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => { - let route_single: SingleNodeRoutingInfo = Some(route).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => { - let route_single: SingleNodeRoutingInfo = - Some(Route::new_random_primary()).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::ByAddress { host, port }, - )) => { - let address = format!("{host}:{port}"); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.clone()), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::MultiNode(( - multi_node_routing, - response_policy, - ))) => { - // save the routing info and response policy, so we will be able to aggregate the results later - response_policies.push((index, multi_node_routing.clone(), response_policy)); - match multi_node_routing { - MultipleNodeRoutingInfo::AllNodes => { - let all_nodes: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_node_connections().collect() - }; - for (inner_index, (address, conn)) in all_nodes.into_iter().enumerate() - { - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - cmd.clone(), - index, - Some(inner_index), - ); - } - } - MultipleNodeRoutingInfo::AllMasters => { - let all_primaries: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_primary_connections().collect() - }; - for (inner_index, (address, conn)) in - all_primaries.into_iter().enumerate() - { - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - cmd.clone(), - index, - Some(inner_index), - ); - } - } - MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { - for (inner_index, (route, indices)) in slots.iter().enumerate() { - let conn = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.connection_for_route(route) - }; - if let Some((address, conn)) = conn { - let new_cmd = - crate::cluster_routing::command_for_multi_slot_indices( - cmd, - indices.iter(), - ); - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - new_cmd, - index, - Some(inner_index), - ); - } - } - } - } - } - } + + /// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. + /// + /// This function processes the provided `response_policies`, which contain information about how responses + /// from multiple nodes should be aggregated. For each policy: + /// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. + async fn aggregate_pipeline_multi_node_commands( + values_and_addresses: &mut [Vec<(Value, String)>], + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + for (index, routing_info, response_policy) in response_policies { + let response_receivers = values_and_addresses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; } - Ok((pipelines_by_connection, response_policies)) + Ok(()) } + async fn try_request(info: RequestInfo, core: Core) -> OperationResult { match info.cmd { CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await, @@ -2380,6 +2148,7 @@ where route, } => { if pipeline.is_atomic() { + // If the pipeline is atomic (i.e., a transaction), we can send it as is, with no need to split it into sub-pipelines. Self::try_pipeline_request( pipeline, offset, @@ -2388,74 +2157,36 @@ where ) .await } else { + // The pipeline is not atomic, we need to split it into sub-pipelines and send them separately. + + // Distribute pipeline commands across cluster nodes based on routing information. + // Returns: + // - pipelines_by_connection: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node operations let (pipelines_by_connection, response_policies) = - Self::routes_pipeline_commands(&pipeline, core.clone()) + map_pipeline_to_nodes(&pipeline, core.clone()) .await .map_err(|err| (OperationTarget::FanOut, err))?; + // Stores responses along with their source node addresses for each pipeline command. + // + // The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. + // Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command + // might produce multiple responses, each from a different node. By storing the responses with their + // respective node addresses, we ensure that we have all the information needed to aggregate the results later. + // This structure is essential for handling scenarios where responses from multiple nodes must be combined. let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; - let mut first_error = None; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks - for (address, (pipeline, conn, indices)) in pipelines_by_connection { - // Spawn the async task - join_set.spawn(async move { - let count = pipeline.len(); - let result = - Self::try_pipeline_request(Arc::new(pipeline), 0, count, async { - Ok((address.clone(), conn)) - }) - .await?; - match result { - Response::Multiple(values) => Ok((indices, values, address)), - _ => Err(( - OperationTarget::FanOut, - RedisError::from(( - ErrorKind::ResponseError, - "Unsupported response type", - )), - )), - } - }); + for (address, node_context) in pipelines_by_connection { + // Spawn a new task to execute the pipeline on the node + join_set.spawn(execute_pipeline_on_node(address, node_context)); } // Wait for all spawned tasks to complete - while let Some(future_result) = join_set.join_next().await { - match future_result { - Err(e) => { - return Err(( - OperationTarget::FanOut, - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - .into(), - )); - } - Ok(Ok((indices, values, address))) => { - for ((index, inner_index), value) in indices.into_iter().zip(values) - { - if let Some(inner_index) = inner_index { - // Ensure the vector is big enough to hold `inner_index` - if values_and_addresses[index].len() <= inner_index { - values_and_addresses[index].resize( - inner_index + 1, - (Value::Nil, "".to_string()), - ); - } - // Add the value to the specific inner_index within index - values_and_addresses[index][inner_index] = - (value, address.clone()); - } else { - values_and_addresses[index].push((value, address.clone())); - } - } - } - Ok(Err(e)) => { - if first_error.is_none() { - first_error = Some(e); - } - } - } - } + let first_error = + collect_pipeline_tasks(&mut join_set, &mut values_and_addresses).await?; // Check for errors if let Some(first_error) = first_error { @@ -2463,36 +2194,15 @@ where } // Process response policies after all tasks are complete - for (index, routing_info, response_policy) in response_policies { - #[allow(clippy::type_complexity)] - // Safely access `values_and_addresses` for the current index - let response_receivers: Vec<( - Option, - oneshot::Receiver>, - )> = values_and_addresses[index] - .iter() - .map(|(value, address)| { - let (sender, receiver) = oneshot::channel(); - let _ = sender.send(Ok(Response::Single(value.clone()))); - (Some(address.clone()), receiver) - }) - .collect(); - - let aggregated_response = Self::aggregate_results( - response_receivers, - &routing_info, - response_policy, - ) - .await - .map_err(|err| (OperationTarget::FanOut, err))?; - - // Update `values_and_addresses` for the current index - values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; - } + Self::aggregate_pipeline_multi_node_commands( + &mut values_and_addresses, + response_policies, + ) + .await?; // Collect final responses for mut value in values_and_addresses.into_iter() { - assert_eq!(value.len(), 1); + // unwrap() is safe here because we know that the vector is not empty final_responses.push(value.pop().unwrap().0); } @@ -2523,7 +2233,7 @@ where } } - async fn get_connection( + pub async fn get_connection( routing: InternalSingleNodeRouting, core: Core, cmd: Option>, @@ -3164,7 +2874,7 @@ impl Connect for MultiplexedConnection { #[cfg(test)] mod pipeline_routing_tests { - use super::route_for_pipeline; + use super::pipeline_routing::route_for_pipeline; use crate::{ cluster_routing::{Route, SlotAddr}, cmd, diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs new file mode 100644 index 0000000000..1b980c479d --- /dev/null +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -0,0 +1,433 @@ +use crate::aio::ConnectionLike; +use crate::cluster_async::ClusterConnInner; +use crate::cluster_async::Connect; +use crate::cluster_routing::{ + command_for_multi_slot_indices, MultipleNodeRoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, +}; +use crate::{cluster_routing, RedisResult, Value}; +use crate::{cluster_routing::Route, Cmd, ErrorKind, RedisError}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::cluster_async::MUTEX_READ_ERR; +use crate::Pipeline; +use rand::prelude::IteratorRandom; + +use super::{Core, InternalSingleNodeRouting, OperationTarget, Response}; + +/// Represents a pipeline command execution context for a specific node +#[derive(Default)] +pub struct NodePipelineContext { + /// The pipeline of commands to be executed + pub pipeline: Pipeline, + /// The connection to the node + pub connection: C, + /// Vector of (command_index, inner_index) pairs tracking command order + /// command_index: Position in the original pipeline + /// inner_index: Optional sub-index for multi-node operations (e.g. MSET) + pub command_indices: Vec<(usize, Option)>, +} + +/// Maps node addresses to their pipeline execution contexts +pub type NodePipelineMap = HashMap>; + +impl NodePipelineContext { + fn new(connection: C) -> Self { + Self { + pipeline: Pipeline::new(), + connection, + command_indices: Vec::new(), + } + } + + // Adds a command to the pipeline and records its index + fn add_command(&mut self, cmd: Cmd, index: usize, inner_index: Option) { + self.pipeline.add_command(cmd); + self.command_indices.push((index, inner_index)); + } +} + +/// Adds a command to the pipeline map for a specific node address. +pub fn add_command_to_node_pipeline_map( + pipeline_map: &mut NodePipelineMap, + address: String, + connection: C, + cmd: Cmd, + index: usize, + inner_index: Option, +) { + pipeline_map + .entry(address) + .or_insert_with(|| NodePipelineContext::new(connection)) + .add_command(cmd, index, inner_index); +} + +/// Adds a command to a random existing node pipeline in the pipeline map +pub fn add_command_to_random_existing_node( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + index: usize, +) -> RedisResult<()> { + let mut rng = rand::thread_rng(); + if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { + node_context.add_command(cmd, index, None); + Ok(()) + } else { + Err(RedisError::from((ErrorKind::IoError, "No nodes available"))) + } +} + +/// Handles multi-slot commands within a pipeline. +/// +/// This function processes commands with routing information indicating multiple slots +/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, +/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. +/// +/// ### Parameters: +/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. +/// - `core`: The core structure that provides access to connection management. +/// - `cmd`: The original multi-slot command that needs to be split. +/// - `index`: The index of the original command within the pipeline. +/// - `slots`: A vector containing routing information. Each entry includes: +/// - `Route`: The specific route for the slot. +/// - `Vec`: Indices of the keys within the command that map to this slot. +pub async fn handle_pipeline_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, + index: usize, + slots: Vec<(Route, Vec)>, +) where + C: Clone, +{ + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + // create the sub-command for the slot + let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline_map( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } + } +} + +/// Handles pipeline commands that require single-node routing. +/// +/// This function processes commands with `SingleNode` routing information and determines +/// the appropriate handling based on the routing type. +/// +/// ### Parameters: +/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. +/// - `cmd`: The command to process and add to the appropriate node pipeline. +/// - `routing`: The single-node routing information, which determines how the command is routed. +/// - `core`: The core object responsible for managing connections and routing logic. +/// - `index`: The position of the command in the overall pipeline. +pub async fn handle_pipeline_single_node_routing( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline + add_command_to_random_existing_node(pipeline_map, cmd, index) + .map_err(|err| (OperationTarget::NotFound, err))?; + Ok(()) + } else { + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) + } +} + +/// Maps the commands in a pipeline to the appropriate nodes based on their routing information. +/// +/// This function processes each command in the given pipeline, determines its routing information, +/// and organizes it into a map of node pipelines. It handles both single-node and multi-node routing +/// strategies and ensures that the commands are distributed accordingly. +/// +/// It also collects response policies for multi-node routing and returns them along with the pipeline map. +/// This is to ensure we can aggregate responses from properly from the different nodes. +/// +/// # Arguments +/// +/// * `pipeline` - A reference to the pipeline containing the commands to route. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A `RedisResult` containing a tuple: +/// +/// - A `NodePipelineMap` where commands are grouped by their corresponding nodes (as pipelines). +/// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information +/// and response policies for multi-node commands, along with the index of the command in the pipeline, for aggregating the responses later. +pub async fn map_pipeline_to_nodes( + pipeline: &crate::Pipeline, + core: Core, +) -> RedisResult<( + NodePipelineMap, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, +)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut pipelines_by_connection = NodePipelineMap::new(); + let mut response_policies = Vec::new(); + + for (index, cmd) in pipeline.cmd_iter().enumerate() { + match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or( + cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) { + cluster_routing::RoutingInfo::SingleNode(route) => { + handle_pipeline_single_node_routing( + &mut pipelines_by_connection, + cmd.clone(), + route.into(), + core.clone(), + index, + ) + .await + .map_err(|(_target, err)| err)?; + } + + cluster_routing::RoutingInfo::MultiNode((multi_node_routing, response_policy)) => { + //save the routing info and response policy, so we will be able to aggregate the results later + response_policies.push((index, multi_node_routing.clone(), response_policy)); + match multi_node_routing { + MultipleNodeRoutingInfo::AllNodes | MultipleNodeRoutingInfo::AllMasters => { + let connections: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + if matches!(multi_node_routing, MultipleNodeRoutingInfo::AllNodes) { + lock.all_node_connections().collect() + } else { + lock.all_primary_connections().collect() + } + }; + for (inner_index, (address, conn)) in connections.into_iter().enumerate() { + add_command_to_node_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + cmd.clone(), + index, + Some(inner_index), + ); + } + } + MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { + handle_pipeline_multi_slot_routing( + &mut pipelines_by_connection, + core.clone(), + cmd, + index, + slots, + ) + .await; + } + } + } + } + } + Ok((pipelines_by_connection, response_policies)) +} + +/// Executes a pipeline of commands on a specified node. +/// +/// This function sends a batch of commands (pipeline) to the specified node for execution. +/// +/// ### Parameters: +/// - `address`: The address of the target node where the pipeline commands should be executed. +/// - `node_context`: The `NodePipelineContext` containing the pipeline commands and the associated connection. +/// +/// ### Returns: +/// - `Ok((Vec<(usize, Option)>, Vec, String))`: +/// - A vector of command indices (`usize`) and their respective inner indices (`Option`) in the pipeline. +/// - A vector of `Value` objects representing the responses from the executed pipeline. +/// - The address of the node where the pipeline was executed. +/// - `Err((OperationTarget, RedisError))`: +/// - An error tuple containing the target operation and the corresponding error details if execution fails. +pub async fn execute_pipeline_on_node( + address: String, + node_context: NodePipelineContext, +) -> Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let count = node_context.pipeline.len(); + let result = + ClusterConnInner::try_pipeline_request(Arc::new(node_context.pipeline), 0, count, async { + Ok((address.clone(), node_context.connection)) + }) + .await?; + + match result { + Response::Multiple(values) => Ok((node_context.command_indices, values, address)), + _ => Err(( + OperationTarget::FanOut, + RedisError::from((ErrorKind::ResponseError, "Unsupported response type")), + )), + } +} + +/// Adds the result of a pipeline command to the `values_and_addresses` collection. +/// +/// This function updates the `values_and_addresses` vector at the given `index` and optionally at the +/// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough +/// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, +/// the value and address are simply appended to the vector. +/// +/// # Parameters +/// - `values_and_addresses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. +/// - `index`: The index in `values_and_addresses` where the result should be stored. +/// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. +/// - `value`: The result value to store. +/// - `address`: The address associated with the result. +pub fn add_pipeline_result( + values_and_addresses: &mut [Vec<(Value, String)>], + index: usize, + inner_index: Option, + value: Value, + address: String, +) { + match inner_index { + Some(inner_index) => { + // Ensure the vector at the given index is large enough to hold the value and address at the specified position + if values_and_addresses[index].len() <= inner_index { + values_and_addresses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); + } + values_and_addresses[index][inner_index] = (value, address); + } + None => values_and_addresses[index].push((value, address)), + } +} + +/// Collects and processes the results of pipeline tasks from a `tokio::task::JoinSet`. +/// +/// This function iteratively retrieves completed tasks from the provided `join_set` and processes +/// their results. Successful results are added to the `values_and_addresses` vector using the +/// indices and values provided. If an error occurs in any task, it is recorded and returned as +/// the first encountered error. +/// +/// # Parameters +/// - `join_set`: A mutable reference to a `tokio::task::JoinSet` containing tasks that return: +/// - `Ok((Vec<(usize, Option)>, Vec, String))`: On success, a tuple of: +/// - A list of indices and optional inner indices corresponding to pipeline commands. +/// - A list of `Value` results from the executed pipeline. +/// - The `String` address where the task was executed. +/// - `Err((OperationTarget, RedisError))`: On failure, an error detailing the operation target and the Redis error. +/// - `values_and_addresses`: A mutable slice of vectors, where each vector corresponds to a pipeline +/// command's results. This is updated with the values and addresses from successful tasks. +/// +/// # Returns +/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error. +/// - `Ok(None)`: If all tasks completed successfully. +/// - `Err((OperationTarget::FanOut, RedisError))`: If a task failed unexpectedly (e.g., due to a panic). +/// +/// +/// # Behavior +/// - Processes successful results by calling `add_pipeline_result` to update the +/// `values_and_addresses` vector with the indices, values, and addresses. +/// - Records the first error encountered and continues processing the remaining tasks. +/// - Returns `Ok(None)` if all tasks complete successfully. +#[allow(clippy::type_complexity)] +pub async fn collect_pipeline_tasks( + join_set: &mut tokio::task::JoinSet< + Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)>, + >, + values_and_addresses: &mut [Vec<(Value, String)>], +) -> Result, (OperationTarget, RedisError)> { + let mut first_error = None; + + while let Some(future_result) = join_set.join_next().await { + match future_result { + Ok(Ok((indices, values, address))) => { + for ((index, inner_index), value) in indices.into_iter().zip(values) { + add_pipeline_result( + values_and_addresses, + index, + inner_index, + value, + address.clone(), + ); + } + } + Ok(Err(e)) => first_error = first_error.or(Some(e)), + Err(e) => { + return Err(( + OperationTarget::FanOut, + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()).into(), + )) + } + } + } + Ok(first_error) +} + +// This function returns the rout for a given pipeline. +// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, +// and returns the route for that specific node. +// If the pipelines contains no key-base commands, the function returns None. +// For non-anomic pipeline, the function will return None, regardless of the commands in it. +pub fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { + fn route_for_command(cmd: &Cmd) -> Option { + match cluster_routing::RoutingInfo::for_routable(cmd) { + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(route), + )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), + Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + .. + })) => None, + None => None, + } + } + + if pipeline.is_atomic() { + // Find first specific slot and send to it. There's no need to check If later commands + // should be routed to a different slot, since the server will return an error indicating this. + pipeline + .cmd_iter() + .map(route_for_command) + .try_fold(None, |chosen_route, next_cmd_route| { + match (chosen_route, next_cmd_route) { + (None, _) => Ok(next_cmd_route), + (_, None) => Ok(chosen_route), + (Some(chosen_route), Some(next_cmd_route)) => { + if chosen_route.slot() != next_cmd_route.slot() { + Err(( + ErrorKind::CrossSlot, + "Received crossed slots in transaction", + ) + .into()) + } else { + Ok(Some(chosen_route)) + } + } + } + }) + } else { + // Pipeline is not atomic, so we can have commands with different slots. + Ok(None) + } +}