Skip to content

Commit

Permalink
[rust]support message type check on rust client (#658)
Browse files Browse the repository at this point in the history
* support message type check on rust client

* update test cases

* simplify MessageType conversion

* remove unused function

* fix code fmt issue

* fix cargo fmt issue

* Use strong type in application code and primitive type for wire data only

Signed-off-by: Li Zhanhui <[email protected]>

---------

Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
  • Loading branch information
glcrazier and lizhanhui authored Jan 25, 2024
1 parent 9004405 commit 65b70ed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
3 changes: 3 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum ErrorKind {
#[error("Message is invalid")]
InvalidMessage,

#[error("Message type not match with topic accept message type")]
MessageTypeNotMatch,

#[error("Server error")]
Server,

Expand Down
29 changes: 29 additions & 0 deletions rust/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@ use std::collections::HashMap;

use crate::error::{ClientError, ErrorKind};
use crate::model::common::Endpoints;
use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
use crate::model::message_id::UNIQ_ID_GENERATOR;
use crate::pb;

#[derive(Clone, Copy, Debug)]
pub enum MessageType {
NORMAL = 1,
FIFO = 2,
DELAY = 3,
TRANSACTION = 4,
}

/// [`Message`] is the data model for sending.
pub trait Message {
fn take_message_id(&mut self) -> String;
Expand All @@ -35,6 +44,17 @@ pub trait Message {
fn take_message_group(&mut self) -> Option<String>;
fn take_delivery_timestamp(&mut self) -> Option<i64>;
fn transaction_enabled(&mut self) -> bool;
fn get_message_type(&self) -> MessageType;
}

pub trait MessageTypeAware {
fn accept_type(&self, message_type: MessageType) -> bool;
}

impl MessageTypeAware for pb::MessageQueue {
fn accept_type(&self, message_type: MessageType) -> bool {
self.accept_message_types.contains(&(message_type as i32))
}
}

pub(crate) struct MessageImpl {
Expand All @@ -47,6 +67,7 @@ pub(crate) struct MessageImpl {
pub(crate) message_group: Option<String>,
pub(crate) delivery_timestamp: Option<i64>,
pub(crate) transaction_enabled: bool,
pub(crate) message_type: MessageType,
}

impl Message for MessageImpl {
Expand Down Expand Up @@ -85,6 +106,10 @@ impl Message for MessageImpl {
fn transaction_enabled(&mut self) -> bool {
self.transaction_enabled
}

fn get_message_type(&self) -> MessageType {
self.message_type
}
}

/// [`MessageBuilder`] is the builder for [`Message`].
Expand All @@ -108,6 +133,7 @@ impl MessageBuilder {
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
message_type: NORMAL,
},
}
}
Expand Down Expand Up @@ -135,6 +161,7 @@ impl MessageBuilder {
message_group: Some(message_group.into()),
delivery_timestamp: None,
transaction_enabled: false,
message_type: FIFO,
},
}
}
Expand Down Expand Up @@ -162,6 +189,7 @@ impl MessageBuilder {
message_group: None,
delivery_timestamp: Some(delay_time),
transaction_enabled: false,
message_type: DELAY,
},
}
}
Expand All @@ -184,6 +212,7 @@ impl MessageBuilder {
message_group: None,
delivery_timestamp: None,
transaction_enabled: true,
message_type: TRANSACTION,
},
}
}
Expand Down
46 changes: 31 additions & 15 deletions rust/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, SendReceipt};
use crate::model::message;
use crate::model::message::{self, MessageTypeAware};
use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
use crate::pb::{Encoding, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
Expand Down Expand Up @@ -172,20 +172,16 @@ impl Producer {
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });

let message_type = if message.transaction_enabled() {
if message.transaction_enabled() {
message_group = None;
delivery_timestamp = None;
MessageType::Transaction as i32
} else if delivery_timestamp.is_some() {
message_group = None;
MessageType::Delay as i32
} else if message_group.is_some() {
delivery_timestamp = None;
MessageType::Fifo as i32
} else {
MessageType::Normal as i32
};

// TODO: use a converter trait From or TryFrom
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
Expand All @@ -198,7 +194,7 @@ impl Producer {
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
message_type,
message_type: message.get_message_type() as i32,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
Expand Down Expand Up @@ -241,6 +237,11 @@ impl Producer {
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
let message_types = messages
.iter()
.map(|message| message.get_message_type())
.collect::<Vec<_>>();

let (topic, message_group, mut pb_messages) =
self.transform_messages_to_protobuf(messages)?;

Expand All @@ -252,6 +253,22 @@ impl Producer {
select_message_queue(route)
};

if self.option.validate_message_type() {
for message_type in message_types {
if !message_queue.accept_type(message_type) {
return Err(ClientError::new(
ErrorKind::MessageTypeNotMatch,
format!(
"Current message type {:?} not match with accepted types {:?}.",
message_type, message_queue.accept_message_types
)
.as_str(),
Self::OPERATION_SEND_MESSAGE,
));
}
}
}

let endpoints =
build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
for message in pb_messages.iter_mut() {
Expand Down Expand Up @@ -298,7 +315,7 @@ mod tests {
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl};
use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
use crate::model::transaction::TransactionResolution;
use crate::pb::{Broker, MessageQueue};
use crate::session::Session;
Expand Down Expand Up @@ -424,6 +441,7 @@ mod tests {
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
message_type: MessageType::TRANSACTION,
}];
let result = producer.transform_messages_to_protobuf(messages);
assert!(result.is_err());
Expand Down Expand Up @@ -491,7 +509,7 @@ mod tests {
addresses: vec![],
}),
}),
accept_message_types: vec![],
accept_message_types: vec![MessageType::NORMAL as i32],
}],
}))
});
Expand Down Expand Up @@ -539,7 +557,7 @@ mod tests {
addresses: vec![],
}),
}),
accept_message_types: vec![],
accept_message_types: vec![MessageType::TRANSACTION as i32],
}],
}))
});
Expand All @@ -563,9 +581,7 @@ mod tests {

let _ = producer
.send_transaction_message(
MessageBuilder::builder()
.set_topic("test_topic")
.set_body(vec![])
MessageBuilder::transaction_message_builder("test_topic", vec![])
.build()
.unwrap(),
)
Expand Down

0 comments on commit 65b70ed

Please sign in to comment.