diff --git a/src/client.rs b/src/client.rs index c9c9b85..59c4db4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -49,7 +49,7 @@ pub struct Kafka { pub manager: Arc>, pub operation_retry_options: OperationRetryOptions, pub executor: Arc, - pub cluster_meta: Arc, + pub cluster: Arc, supported_versions: Arc>, } @@ -122,20 +122,20 @@ impl Kafka { manager, operation_retry_options, executor, - cluster_meta: Arc::new(Cluster::default()), + cluster: Arc::new(Cluster::default()), supported_versions: Arc::new(supported_versions), }) } pub fn topic_id(&self, topic_name: &TopicName) -> Uuid { - match self.cluster_meta.topic_id(topic_name) { + match self.cluster.topic_id(topic_name) { Some(topic_id) => topic_id, None => Uuid::nil(), } } pub fn partitions(&self, topic: &TopicName) -> Result { - self.cluster_meta.partitions(topic) + self.cluster.partitions(topic) } pub fn version_range(&self, key: ApiKey) -> Option { @@ -284,7 +284,7 @@ impl Kafka { let request = RequestKind::MetadataRequest(request); let response = self.manager.invoke(&self.manager.url, request).await?; if let ResponseKind::MetadataResponse(metadata) = response { - self.cluster_meta.update_metadata(metadata) + self.cluster.update_metadata(metadata) } else { Err(Error::Connection(ConnectionError::UnexpectedResponse( format!("{response:?}"), @@ -293,8 +293,8 @@ impl Kafka { } pub async fn update_full_metadata(&self) -> Result<()> { - let mut topics = Vec::with_capacity(self.cluster_meta.topics.len()); - for topic in self.cluster_meta.topics.iter() { + let mut topics = Vec::with_capacity(self.cluster.topics.len()); + for topic in self.cluster.topics.iter() { topics.push(topic.key().clone()); } self.update_metadata(topics).await diff --git a/src/consumer/fetcher.rs b/src/consumer/fetcher.rs index c803f83..90a8e33 100644 --- a/src/consumer/fetcher.rs +++ b/src/consumer/fetcher.rs @@ -61,7 +61,7 @@ impl Fetcher { completed_fetches_tx: mpsc::UnboundedSender, ) -> Self { let sessions = DashMap::new(); - for node in client.cluster_meta.nodes.iter() { + for node in client.cluster.nodes.iter() { sessions.insert(node.id, FetchSession::new(node.id)); } @@ -100,7 +100,7 @@ impl Fetcher { version = 12; } let metadata = fetch_request_data.metadata; - if let Some(node) = self.client.cluster_meta.nodes.get(&node) { + if let Some(node) = self.client.cluster.nodes.get(&node) { let fetch_request = self.fetch_builder(&mut fetch_request_data, version).await?; trace!("Send fetch request: {:?}", fetch_request); @@ -239,7 +239,7 @@ impl Fetcher { match rx.await { Ok(partitions) => { for tp in partitions { - let current_leader = self.client.cluster_meta.current_leader(&tp); + let current_leader = self.client.cluster.current_leader(&tp); self.event_tx.unbounded_send( CoordinatorEvent::MaybeValidatePositionForCurrentLeader { partition: tp, @@ -428,7 +428,7 @@ impl Fetcher { let position = FetchPosition { offset: offset_data.offset - 1, offset_epoch: None, - current_leader: self.client.cluster_meta.current_leader(&partition), + current_leader: self.client.cluster.current_leader(&partition), }; // TODO: metadata update last seen epoch if newer self.event_tx @@ -603,8 +603,8 @@ impl Fetcher { offset_reset_timestamps: &mut HashMap, ) -> Result> { let mut node_request = HashMap::new(); - for node_entry in self.client.cluster_meta.nodes.iter() { - if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) { + for node_entry in self.client.cluster.nodes.iter() { + if let Ok(node_topology) = self.client.cluster.drain_node(node_entry.value().id) { let partitions = node_topology.value(); let mut topics = HashMap::new(); diff --git a/src/coordinator/consumer.rs b/src/coordinator/consumer.rs index c8fe59e..e3a1d5d 100644 --- a/src/coordinator/consumer.rs +++ b/src/coordinator/consumer.rs @@ -557,7 +557,7 @@ impl CoordinatorInner { }; let mut tp_state = TopicPartitionState::new(*partition); tp_state.position.current_leader = - self.client.cluster_meta.current_leader(&tp); + self.client.cluster.current_leader(&tp); self.subscriptions.assignments.insert(tp, tp_state); } } @@ -837,7 +837,7 @@ impl CoordinatorInner { match self.group_meta.protocol_name { Some(ref protocol) => { let assignor = self.look_up_assignor(&protocol.to_string())?; - let cluster = self.client.cluster_meta.clone(); + let cluster = self.client.cluster.clone(); request.assignments = serialize_assignments(assignor.assign(cluster, &self.group_subscription)?)?; } @@ -866,7 +866,7 @@ impl CoordinatorInner { if version <= 7 { let mut topics = Vec::with_capacity(self.subscriptions.topics.len()); for assign in self.subscriptions.topics.iter() { - let partitions = self.client.cluster_meta.partitions(assign)?; + let partitions = self.client.cluster.partitions(assign)?; let mut topic = OffsetFetchRequestTopic::default(); topic.name = assign.clone(); @@ -878,7 +878,7 @@ impl CoordinatorInner { } else { let mut topics = Vec::with_capacity(self.subscriptions.topics.len()); for assign in self.subscriptions.topics.iter() { - let partitions = self.client.cluster_meta.partitions(assign)?; + let partitions = self.client.cluster.partitions(assign)?; let mut topic = OffsetFetchRequestTopics::default(); topic.name = assign.clone(); diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 31e1369..3a986dd 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -332,8 +332,8 @@ impl Producer { encode_options: &RecordEncodeOptions, ) -> Result> { let mut result = Vec::new(); - for node_entry in self.client.cluster_meta.nodes.iter() { - if let Ok(node) = self.client.cluster_meta.drain_node(node_entry.value().id) { + for node_entry in self.client.cluster.nodes.iter() { + if let Ok(node) = self.client.cluster.drain_node(node_entry.value().id) { let partitions = node.value(); if partitions.is_empty() { continue; @@ -448,7 +448,7 @@ impl TopicProducer { pub async fn new(client: Arc>, topic: TopicName) -> Result>> { client.update_metadata(vec![topic.clone()]).await?; - let partitions = client.cluster_meta.partitions(&topic)?; + let partitions = client.cluster.partitions(&topic)?; let partitions = partitions.value(); let num_partitions = partitions.len(); let batches = DashMap::with_capacity_and_hasher(num_partitions, FxBuildHasher::default()); @@ -477,7 +477,7 @@ impl TopicProducer { &self.topic, record.key(), record.value(), - &self.client.cluster_meta, + &self.client.cluster, )?; } return match self.batches.get_mut(&partition) {