Skip to content

Commit

Permalink
[rust]support handling settings command in Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
glcrazier committed Feb 21, 2024
1 parent 65b70ed commit 41ca02c
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 176 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ minitrace = "0.4"
byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
time = "0.3"
time = { version = "0.3", features = ["local-offset"] }
once_cell = "1.18.0"

mockall = "0.11.4"
Expand Down
153 changes: 21 additions & 132 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ use tokio::sync::{mpsc, oneshot};
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
use crate::model::message::{AckMessageEntry, MessageView};
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::model::message::AckMessageEntry;
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, Resource,
SendMessageRequest, Status, TelemetryCommand,
};
#[double]
use crate::session::SessionManager;
Expand All @@ -54,7 +52,6 @@ pub(crate) struct Client {
id: String,
access_endpoints: Endpoints,
settings: TelemetryCommand,
transaction_checker: Option<Box<TransactionChecker>>,
telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
Expand All @@ -70,8 +67,6 @@ const OPERATION_HEARTBEAT: &str = "client.heartbeat";
const OPERATION_SEND_MESSAGE: &str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
const OPERATION_END_TRANSACTION: &str = "client.end_transaction";
const OPERATION_HANDLE_TELEMETRY_COMMAND: &str = "client.handle_telemetry_command";

impl Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -102,28 +97,23 @@ impl Client {
id,
access_endpoints: endpoints,
settings,
transaction_checker: None,
telemetry_command_tx: None,
shutdown_tx: None,
})
}

pub(crate) fn is_started(&self) -> bool {
self.shutdown_tx.is_some()
pub(crate) fn get_endpoints(&self) -> Endpoints {
self.access_endpoints.clone()
}

pub(crate) fn has_transaction_checker(&self) -> bool {
self.transaction_checker.is_some()
}

pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
if self.is_started() {
panic!("client {} is started, can not be modified", self.id)
}
self.transaction_checker = Some(transaction_checker);
pub(crate) fn is_started(&self) -> bool {
self.shutdown_tx.is_some()
}

pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
pub(crate) async fn start(
&mut self,
telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>,
) -> Result<(), ClientError> {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();

Expand All @@ -134,19 +124,12 @@ impl Client {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);

// send heartbeat and handle telemetry command
let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
self.telemetry_command_tx = Some(telemetry_command_tx);

let rpc_client = self
.get_session()
.await
.map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
// give a placeholder
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
}

tokio::spawn(async move {
rpc_client.is_started();
Expand Down Expand Up @@ -188,24 +171,13 @@ impl Client {
debug!(logger,"send heartbeat to server success, peer={}",peer);
}
},
command = telemetry_command_rx.recv() => {
if let Some(command) = command {
let result = Self::handle_telemetry_command(rpc_client.shadow_session(), &transaction_checker, endpoints.clone(), command).await;
if let Err(error) = result {
error!(logger, "handle telemetry command failed: {:?}", error);
}
}
},
_ = &mut shutdown_rx => {
info!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
info!(logger, "receive shutdown signal, stop heartbeat task.");
break;
}
}
}
info!(
logger,
"heartbeat task and telemetry command handler are stopped"
);
info!(logger, "heartbeat task is stopped");
});
Ok(())
}
Expand Down Expand Up @@ -239,58 +211,6 @@ impl Client {
Ok(())
}

async fn handle_telemetry_command<T: RPCClient + 'static>(
mut rpc_client: T,
transaction_checker: &Option<Box<TransactionChecker>>,
endpoints: Endpoints,
command: pb::telemetry_command::Command,
) -> Result<(), ClientError> {
return match command {
RecoverOrphanedTransactionCommand(command) => {
let transaction_id = command.transaction_id;
let message = command.message.unwrap();
let message_id = message
.system_properties
.as_ref()
.unwrap()
.message_id
.clone();
let topic = message.topic.as_ref().unwrap().clone();
if let Some(transaction_checker) = transaction_checker {
let resolution = transaction_checker(
transaction_id.clone(),
MessageView::from_pb_message(message, endpoints),
);

let response = rpc_client
.end_transaction(EndTransactionRequest {
topic: Some(topic),
message_id: message_id.to_string(),
transaction_id,
resolution: resolution as i32,
source: TransactionSource::SourceServerCheck as i32,
trace_context: "".to_string(),
})
.await?;
Self::handle_response_status(response.status, OPERATION_END_TRANSACTION)
} else {
Err(ClientError::new(
ErrorKind::Config,
"failed to get transaction checker",
OPERATION_END_TRANSACTION,
))
}
}
Settings(_) => Ok(()),
_ => Err(ClientError::new(
ErrorKind::Config,
"receive telemetry command but there is no handler",
OPERATION_HANDLE_TELEMETRY_COMMAND,
)
.with_context("command", format!("{:?}", command))),
};
}

pub(crate) fn client_id(&self) -> &str {
&self.id
}
Expand Down Expand Up @@ -704,13 +624,11 @@ pub(crate) mod tests {
use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
use crate::model::common::{ClientType, Route};
use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
SystemProperties, TelemetryCommand,
FilterExpression, HeartbeatResponse, Message, MessageQueue, QueryRouteResponse,
ReceiveMessageResponse, Resource, SendMessageResponse, Status, TelemetryCommand,
};
use crate::session;

Expand All @@ -731,7 +649,6 @@ pub(crate) mod tests {
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: None,
shutdown_tx: None,
}
Expand All @@ -747,7 +664,6 @@ pub(crate) mod tests {
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: Some(tx),
shutdown_tx: None,
}
Expand Down Expand Up @@ -784,7 +700,8 @@ pub(crate) mod tests {
.returning(|_, _, _| Ok(Session::mock()));

let mut client = new_client_with_session_manager(session_manager);
client.start().await?;
let (tx, _) = mpsc::channel(16);
client.start(tx).await?;

// TODO use countdown latch instead sleeping
// wait for run
Expand All @@ -800,7 +717,8 @@ pub(crate) mod tests {
.returning(|_, _, _| Ok(Session::mock()));

let mut client = new_client_with_session_manager(session_manager);
let _ = client.start().await;
let (tx, _rx) = mpsc::channel(16);
let _ = client.start(tx).await;
let result = client.get_session().await;
assert!(result.is_ok());
let result = client
Expand Down Expand Up @@ -1134,33 +1052,4 @@ pub(crate) mod tests {
assert_eq!(error.message, "server return an error");
assert_eq!(error.operation, "client.ack_message");
}

#[tokio::test]
async fn client_handle_telemetry_command() {
let response = Ok(EndTransactionResponse {
status: Some(Status {
code: Code::Ok as i32,
message: "".to_string(),
}),
});
let mut mock = session::MockRPCClient::new();
mock.expect_end_transaction()
.return_once(|_| Box::pin(futures::future::ready(response)));
let result = Client::handle_telemetry_command(
mock,
&Some(Box::new(|_, _| TransactionResolution::COMMIT)),
Endpoints::from_url("localhost:8081").unwrap(),
RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
message: Some(Message {
topic: Some(Resource::default()),
user_properties: Default::default(),
system_properties: Some(SystemProperties::default()),
body: vec![],
}),
transaction_id: "".to_string(),
}),
)
.await;
assert!(result.is_ok())
}
}
Loading

0 comments on commit 41ca02c

Please sign in to comment.