Skip to content

Commit

Permalink
Merge pull request #300 from rafal-ch/add_reconnection_test
Browse files Browse the repository at this point in the history
Fix reconnection bug
  • Loading branch information
rafal-ch authored May 7, 2024
2 parents 8aa3710 + 23d8f4f commit e91a4e0
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 34 deletions.
24 changes: 23 additions & 1 deletion rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,30 @@ impl NodeClientConfig {
}
}

/// Creates an instance of `NodeClientConfig` with specified listening port.
#[cfg(any(feature = "testing", test))]
pub fn finite_retries_config(port: u16, num_of_retries: usize) -> Self {
pub fn new_with_port(port: u16) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
NodeClientConfig {
address: local_socket,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS,
client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS,
exponential_backoff: ExponentialBackoffConfig {
initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS,
max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS,
coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT,
max_attempts: MaxAttempts::Infinite,
},
}
}

/// Creates an instance of `NodeClientConfig` with specified listening port and maximum number
/// of reconnection retries.
#[cfg(any(feature = "testing", test))]
pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
NodeClientConfig {
address: local_socket,
Expand Down
112 changes: 92 additions & 20 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{
};
use tokio::{
net::TcpStream,
sync::{Notify, RwLock, RwLockWriteGuard, Semaphore},
sync::{futures::Notified, RwLock, RwLockWriteGuard, Semaphore},
};
use tracing::{error, field, info, warn};

Expand Down Expand Up @@ -279,10 +279,35 @@ impl Error {
}
}

struct Reconnect;
struct Shutdown;

struct Notify<T> {
inner: tokio::sync::Notify,
phantom: std::marker::PhantomData<T>,
}

impl<T> Notify<T> {
fn new() -> Arc<Self> {
Arc::new(Self {
inner: tokio::sync::Notify::new(),
phantom: std::marker::PhantomData,
})
}

fn notified(&self) -> Notified {
self.inner.notified()
}

fn notify_one(&self) {
self.inner.notify_one()
}
}

pub struct FramedNodeClient {
client: Arc<RwLock<Framed<TcpStream, BinaryMessageCodec>>>,
reconnect: Arc<Notify>,
shutdown: Arc<Notify>,
reconnect: Arc<Notify<Reconnect>>,
shutdown: Arc<Notify<Shutdown>>,
config: NodeClientConfig,
request_limit: Semaphore,
}
Expand All @@ -292,14 +317,14 @@ impl FramedNodeClient {
config: NodeClientConfig,
) -> Result<(Self, impl Future<Output = Result<(), AnyhowError>>), AnyhowError> {
let stream = Arc::new(RwLock::new(Self::connect_with_retries(&config).await?));
let shutdown = Arc::new(Notify::new());
let reconnect = Arc::new(Notify::new());
let shutdown = Notify::<Shutdown>::new();
let reconnect = Notify::<Reconnect>::new();

let reconnect_loop = Self::reconnect_loop(
config.clone(),
Arc::clone(&stream),
Arc::clone(&reconnect),
Arc::clone(&shutdown),
Arc::clone(&reconnect),
);

Ok((
Expand All @@ -317,15 +342,15 @@ impl FramedNodeClient {
async fn reconnect_loop(
config: NodeClientConfig,
client: Arc<RwLock<Framed<TcpStream, BinaryMessageCodec>>>,
shutdown: Arc<Notify>,
reconnect: Arc<Notify>,
shutdown: Arc<Notify<Shutdown>>,
reconnect: Arc<Notify<Reconnect>>,
) -> Result<(), AnyhowError> {
loop {
tokio::select! {
_ = reconnect.notified() => {
let mut lock = client.write().await;
let new_client = Self::reconnect(&config.clone()).await?;
*lock = new_client;
let mut lock = client.write().await;
let new_client = Self::reconnect(&config.clone()).await?;
*lock = new_client;
},
_ = shutdown.notified() => {
info!("node client shutdown has been requested");
Expand Down Expand Up @@ -460,7 +485,7 @@ impl NodeClient for FramedNodeClient {

fn handle_response(
resp: BinaryResponseAndRequest,
shutdown: &Notify,
shutdown: &Notify<Shutdown>,
) -> Result<BinaryResponseAndRequest, Error> {
let version = resp.response().protocol_version();

Expand Down Expand Up @@ -565,7 +590,7 @@ mod tests {

#[tokio::test]
async fn should_reject_bad_major_version() {
let notify = Notify::new();
let notify = Notify::<Shutdown>::new();
let bad_version = ProtocolVersion::from_parts(10, 0, 0);

let result = handle_response(
Expand All @@ -582,7 +607,7 @@ mod tests {

#[tokio::test]
async fn should_accept_different_minor_version() {
let notify = Notify::new();
let notify = Notify::<Shutdown>::new();
let version = ProtocolVersion::new(SemVer {
minor: SUPPORTED_PROTOCOL_VERSION.value().minor + 1,
..SUPPORTED_PROTOCOL_VERSION.value()
Expand All @@ -608,7 +633,7 @@ mod tests {

#[tokio::test]
async fn should_accept_different_patch_version() {
let notify = Notify::new();
let notify = Notify::<Shutdown>::new();
let version = ProtocolVersion::new(SemVer {
patch: SUPPORTED_PROTOCOL_VERSION.value().patch + 1,
..SUPPORTED_PROTOCOL_VERSION.value()
Expand All @@ -634,7 +659,7 @@ mod tests {

#[tokio::test]
async fn given_client_and_no_node_should_fail_after_tries() {
let config = NodeClientConfig::finite_retries_config(1111, 2);
let config = NodeClientConfig::new_with_port_and_retries(1111, 2);
let res = FramedNodeClient::new(config).await;

assert!(res.is_err());
Expand All @@ -648,8 +673,10 @@ mod tests {
async fn given_client_and_node_should_connect_and_do_request() {
let port = get_port();
let mut rng = TestRng::new();
let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(port).await;
let config = NodeClientConfig::finite_retries_config(port, 2);
let shutdown = Arc::new(tokio::sync::Notify::new());
let _mock_server_handle =
start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await;
let config = NodeClientConfig::new_with_port_and_retries(port, 2);
let (c, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &c)
Expand All @@ -663,12 +690,14 @@ mod tests {
async fn given_client_should_try_until_node_starts() {
let mut rng = TestRng::new();
let port = get_port();
let shutdown = Arc::new(tokio::sync::Notify::new());
tokio::spawn(async move {
sleep(Duration::from_secs(5)).await;
let _mock_server_handle =
start_mock_binary_port_responding_with_stored_value(port).await;
start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown))
.await;
});
let config = NodeClientConfig::finite_retries_config(port, 5);
let config = NodeClientConfig::new_with_port_and_retries(port, 5);
let (client, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &client)
Expand All @@ -694,4 +723,47 @@ mod tests {
.ok_or(Error::NoResponseBody)
.map(|query_res| query_res.into_inner().0)
}

#[tokio::test]
async fn given_client_should_reconnect_to_restarted_node_and_do_request() {
let port = get_port();
let mut rng = TestRng::new();
let shutdown = Arc::new(tokio::sync::Notify::new());
let mock_server_handle =
start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await;
let config = NodeClientConfig::new_with_port(port);
let (c, reconnect_loop) = FramedNodeClient::new(config).await.unwrap();

let scenario = async {
assert!(query_global_state_for_string_value(&mut rng, &c)
.await
.is_ok());

shutdown.notify_one();
let _ = mock_server_handle.await;

let err = query_global_state_for_string_value(&mut rng, &c)
.await
.unwrap_err();
assert!(matches!(
err,
Error::RequestFailed(e) if e == "disconnected"
));

let _mock_server_handle =
start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown))
.await;

tokio::time::sleep(Duration::from_secs(2)).await;

assert!(query_global_state_for_string_value(&mut rng, &c)
.await
.is_ok());
};

tokio::select! {
_ = scenario => (),
_ = reconnect_loop => panic!("reconnect loop should not exit"),
}
}
}
34 changes: 23 additions & 11 deletions rpc_sidecar/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use casper_binary_port::{
Expand All @@ -6,6 +7,7 @@ use casper_binary_port::{
};
use casper_types::{bytesrepr::ToBytes, CLValue, ProtocolVersion, StoredValue};
use futures::{SinkExt, StreamExt};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::{
net::{TcpListener, TcpStream},
Expand All @@ -26,20 +28,27 @@ impl BinaryPortMock {
Self { port, response }
}

pub async fn start(&self) {
pub async fn start(&self, shutdown: Arc<Notify>) {
let port = self.port;
let addr = format!("{}:{}", LOCALHOST, port);
let listener = TcpListener::bind(addr.clone())
.await
.expect("failed to listen");
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let response_payload = self.response.clone();
tokio::spawn(handle_client(stream, response_payload));
tokio::select! {
_ = shutdown.notified() => {
break;
}
Err(io_err) => {
println!("acceptance failure: {:?}", io_err);
val = listener.accept() => {
match val {
Ok((stream, _addr)) => {
let response_payload = self.response.clone();
tokio::spawn(handle_client(stream, response_payload));
}
Err(io_err) => {
println!("acceptance failure: {:?}", io_err);
}
}
}
}
}
Expand All @@ -63,20 +72,23 @@ pub fn get_port() -> u16 {
portpicker::pick_unused_port().unwrap()
}

pub async fn start_mock_binary_port_responding_with_stored_value(port: u16) -> JoinHandle<()> {
pub async fn start_mock_binary_port_responding_with_stored_value(
port: u16,
shutdown: Arc<Notify>,
) -> JoinHandle<()> {
let value = StoredValue::CLValue(CLValue::from_t("Foo").unwrap());
let data = GlobalStateQueryResult::new(value, vec![]);
let protocol_version = ProtocolVersion::from_parts(2, 0, 0);
let val = BinaryResponse::from_value(data, protocol_version);
let request = [];
let response = BinaryResponseAndRequest::new(val, &request);
start_mock_binary_port(port, response.to_bytes().unwrap()).await
start_mock_binary_port(port, response.to_bytes().unwrap(), shutdown).await
}

async fn start_mock_binary_port(port: u16, data: Vec<u8>) -> JoinHandle<()> {
async fn start_mock_binary_port(port: u16, data: Vec<u8>, shutdown: Arc<Notify>) -> JoinHandle<()> {
let handler = tokio::spawn(async move {
let binary_port = BinaryPortMock::new(port, data);
binary_port.start().await;
binary_port.start(shutdown).await;
});
sleep(Duration::from_secs(3)).await; // This should be handled differently, preferably the mock binary port should inform that it already bound to the port
handler
Expand Down
8 changes: 6 additions & 2 deletions sidecar/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ impl Component for RpcApiComponent {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::config::SidecarConfig;
use casper_rpc_sidecar::{
Expand Down Expand Up @@ -355,11 +357,13 @@ mod tests {
#[tokio::test]
async fn given_rpc_api_server_component_when_config_should_return_some() {
let port = get_port();
let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(port).await;
let shutdown = Arc::new(tokio::sync::Notify::new());
let _mock_server_handle =
start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await;
let component = RpcApiComponent::new();
let mut config = all_components_all_enabled();
config.rpc_server.as_mut().unwrap().node_client =
NodeClientConfig::finite_retries_config(port, 1);
NodeClientConfig::new_with_port_and_retries(port, 1);
config.rpc_server.as_mut().unwrap().main_server.address = format!("0.0.0.0:{}", port);
config
.rpc_server
Expand Down

0 comments on commit e91a4e0

Please sign in to comment.