Skip to content

Commit

Permalink
remove sub-pipeline
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Jan 29, 2025
1 parent deee614 commit 96dc68b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 18 deletions.
5 changes: 4 additions & 1 deletion glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ where
offset,
count,
route: route.into(),
sub_pipeline: false,
},
sender,
})
Expand Down Expand Up @@ -612,6 +613,7 @@ enum CmdArg<C> {
offset: usize,
count: usize,
route: InternalSingleNodeRouting<C>,
sub_pipeline: bool,
},
ClusterScan {
// struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type.
Expand Down Expand Up @@ -2115,8 +2117,9 @@ where
offset,
count,
route,
sub_pipeline,
} => {
if pipeline.is_atomic() || pipeline.is_sub_pipeline() {
if pipeline.is_atomic() || sub_pipeline {
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines.
Self::try_pipeline_request(
pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ pub type NodePipelineMap<C> = HashMap<String, NodePipelineContext<C>>;

impl<C> NodePipelineContext<C> {
fn new(connection: C) -> Self {
let mut pipeline = Pipeline::new();
pipeline.sub_pipeline();
Self {
pipeline,
pipeline: Pipeline::new(),
connection,
command_indices: Vec::new(),
}
Expand Down Expand Up @@ -311,6 +309,8 @@ where
address: address.clone(),
conn: async { context.connection }.boxed().shared(),
},
// mark it as a sub-pipeline mode
sub_pipeline: true,
},
},
});
Expand Down
14 changes: 0 additions & 14 deletions glide-core/redis-rs/redis/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub struct Pipeline {
commands: Vec<Cmd>,
transaction_mode: bool,
ignored_commands: HashSet<usize>,
is_sub_pipeline: bool,
}

/// A pipeline allows you to send multiple commands in one go to the
Expand Down Expand Up @@ -49,7 +48,6 @@ impl Pipeline {
commands: Vec::with_capacity(capacity),
transaction_mode: false,
ignored_commands: HashSet::new(),
is_sub_pipeline: false,
}
}

Expand All @@ -72,13 +70,6 @@ impl Pipeline {
self
}

/// Enables sub-pipeline mode, indicating that this pipeline is part of a larger pipeline
/// split across multiple nodes.
pub fn sub_pipeline(&mut self) -> &mut Pipeline {
self.is_sub_pipeline = true;
self
}

/// Returns the encoded pipeline commands.
pub fn get_packed_pipeline(&self) -> Vec<u8> {
encode_pipeline(&self.commands, self.transaction_mode)
Expand Down Expand Up @@ -230,11 +221,6 @@ impl Pipeline {
pub fn is_empty(&self) -> bool {
self.commands.is_empty()
}

/// Returns whether the pipeline is in sub-pipeline mode.
pub fn is_sub_pipeline(&self) -> bool {
self.is_sub_pipeline
}
}

fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
Expand Down

0 comments on commit 96dc68b

Please sign in to comment.