Skip to content

Commit

Permalink
[rust]propagate error to upper layer when processing transaction comm…
Browse files Browse the repository at this point in the history
…ands
  • Loading branch information
glcrazier committed Feb 21, 2024
1 parent 41ca02c commit e03f30b
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions rust/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
* limitations under the License.
*/

use std::fmt::Debug;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use mockall_double::double;
use prost_types::Timestamp;
use slog::{error, info, warn, Logger};
use tokio::select;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot};

#[double]
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
Expand All @@ -35,14 +44,7 @@ use crate::util::{
select_message_queue_by_message_group, HOST_NAME,
};
use crate::{log, pb};
use mockall_double::double;
use prost_types::Timestamp;
use slog::{error, info, warn, Logger};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::select;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot};

/// [`Producer`] is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy.
///
/// [`Producer`] is a thin wrapper of internal client struct that shoulders the actual workloads.
Expand Down Expand Up @@ -203,14 +205,33 @@ impl Producer {
endpoints: Endpoints,
) -> Result<(), ClientError> {
let transaction_id = command.transaction_id;
let message = command.message.unwrap();
let message_id = message
.system_properties
.as_ref()
.unwrap()
.message_id
.clone();
let topic = message.topic.as_ref().unwrap().clone();
let message = if let Some(message) = command.message {
message
} else {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"no message in command",
Self::OPERATION_END_TRANSACTION,
));
};
let message_id = if let Some(system_properties) = message.system_properties.as_ref() {
system_properties.message_id.clone()
} else {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"no message id exists",
Self::OPERATION_END_TRANSACTION,
));
};
let topic = if let Some(topic) = message.topic.as_ref() {
topic.clone()
} else {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"no topic exists in message",
Self::OPERATION_END_TRANSACTION,
));
};
if let Some(transaction_checker) = transaction_checker {
let resolution = transaction_checker(
transaction_id.clone(),
Expand All @@ -237,7 +258,7 @@ impl Producer {
}

fn handle_settings_command(settings: pb::Settings, option: &mut ProducerOption) {
if let PubSub::Publishing(publishing) = settings.pub_sub.unwrap() {
if let Some(PubSub::Publishing(publishing)) = settings.pub_sub {
option.set_validate_message_type(publishing.validate_message_type);
};
}
Expand Down Expand Up @@ -446,7 +467,6 @@ impl Producer {
mod tests {
use std::sync::Arc;

use super::*;
use crate::client::MockClient;
use crate::error::ErrorKind;
use crate::log::terminal_logger;
Expand All @@ -456,6 +476,8 @@ mod tests {
use crate::pb::{Broker, Code, EndTransactionResponse, MessageQueue, Status};
use crate::session::{self, Session};

use super::*;

fn new_producer_for_test() -> Producer {
Producer {
option: Default::default(),
Expand Down

0 comments on commit e03f30b

Please sign in to comment.