diff --git a/rpc_sidecar/src/config.rs b/rpc_sidecar/src/config.rs index 6df2b677..482df230 100644 --- a/rpc_sidecar/src/config.rs +++ b/rpc_sidecar/src/config.rs @@ -163,8 +163,30 @@ impl NodeClientConfig { } } + /// Creates an instance of `NodeClientConfig` with specified listening port. #[cfg(any(feature = "testing", test))] - pub fn finite_retries_config(port: u16, num_of_retries: usize) -> Self { + pub fn new_with_port(port: u16) -> Self { + let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + NodeClientConfig { + address: local_socket, + request_limit: DEFAULT_NODE_REQUEST_LIMIT, + max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, + request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, + message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, + client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, + exponential_backoff: ExponentialBackoffConfig { + initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, + max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, + coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, + max_attempts: MaxAttempts::Infinite, + }, + } + } + + /// Creates an instance of `NodeClientConfig` with specified listening port and maximum number + /// of reconnection retries. + #[cfg(any(feature = "testing", test))] + pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self { let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); NodeClientConfig { address: local_socket, diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index e2ee8e52..6bc7e322 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -30,7 +30,7 @@ use std::{ }; use tokio::{ net::TcpStream, - sync::{Notify, RwLock, RwLockWriteGuard, Semaphore}, + sync::{futures::Notified, RwLock, RwLockWriteGuard, Semaphore}, }; use tracing::{error, field, info, warn}; @@ -279,10 +279,35 @@ impl Error { } } +struct Reconnect; +struct Shutdown; + +struct Notify { + inner: tokio::sync::Notify, + phantom: std::marker::PhantomData, +} + +impl Notify { + fn new() -> Arc { + Arc::new(Self { + inner: tokio::sync::Notify::new(), + phantom: std::marker::PhantomData, + }) + } + + fn notified(&self) -> Notified { + self.inner.notified() + } + + fn notify_one(&self) { + self.inner.notify_one() + } +} + pub struct FramedNodeClient { client: Arc>>, - reconnect: Arc, - shutdown: Arc, + reconnect: Arc>, + shutdown: Arc>, config: NodeClientConfig, request_limit: Semaphore, } @@ -292,14 +317,14 @@ impl FramedNodeClient { config: NodeClientConfig, ) -> Result<(Self, impl Future>), AnyhowError> { let stream = Arc::new(RwLock::new(Self::connect_with_retries(&config).await?)); - let shutdown = Arc::new(Notify::new()); - let reconnect = Arc::new(Notify::new()); + let shutdown = Notify::::new(); + let reconnect = Notify::::new(); let reconnect_loop = Self::reconnect_loop( config.clone(), Arc::clone(&stream), - Arc::clone(&reconnect), Arc::clone(&shutdown), + Arc::clone(&reconnect), ); Ok(( @@ -317,15 +342,15 @@ impl FramedNodeClient { async fn reconnect_loop( config: NodeClientConfig, client: Arc>>, - shutdown: Arc, - reconnect: Arc, + shutdown: Arc>, + reconnect: Arc>, ) -> Result<(), AnyhowError> { loop { tokio::select! { _ = reconnect.notified() => { - let mut lock = client.write().await; - let new_client = Self::reconnect(&config.clone()).await?; - *lock = new_client; + let mut lock = client.write().await; + let new_client = Self::reconnect(&config.clone()).await?; + *lock = new_client; }, _ = shutdown.notified() => { info!("node client shutdown has been requested"); @@ -460,7 +485,7 @@ impl NodeClient for FramedNodeClient { fn handle_response( resp: BinaryResponseAndRequest, - shutdown: &Notify, + shutdown: &Notify, ) -> Result { let version = resp.response().protocol_version(); @@ -565,7 +590,7 @@ mod tests { #[tokio::test] async fn should_reject_bad_major_version() { - let notify = Notify::new(); + let notify = Notify::::new(); let bad_version = ProtocolVersion::from_parts(10, 0, 0); let result = handle_response( @@ -582,7 +607,7 @@ mod tests { #[tokio::test] async fn should_accept_different_minor_version() { - let notify = Notify::new(); + let notify = Notify::::new(); let version = ProtocolVersion::new(SemVer { minor: SUPPORTED_PROTOCOL_VERSION.value().minor + 1, ..SUPPORTED_PROTOCOL_VERSION.value() @@ -608,7 +633,7 @@ mod tests { #[tokio::test] async fn should_accept_different_patch_version() { - let notify = Notify::new(); + let notify = Notify::::new(); let version = ProtocolVersion::new(SemVer { patch: SUPPORTED_PROTOCOL_VERSION.value().patch + 1, ..SUPPORTED_PROTOCOL_VERSION.value() @@ -634,7 +659,7 @@ mod tests { #[tokio::test] async fn given_client_and_no_node_should_fail_after_tries() { - let config = NodeClientConfig::finite_retries_config(1111, 2); + let config = NodeClientConfig::new_with_port_and_retries(1111, 2); let res = FramedNodeClient::new(config).await; assert!(res.is_err()); @@ -648,8 +673,10 @@ mod tests { async fn given_client_and_node_should_connect_and_do_request() { let port = get_port(); let mut rng = TestRng::new(); - let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(port).await; - let config = NodeClientConfig::finite_retries_config(port, 2); + let shutdown = Arc::new(tokio::sync::Notify::new()); + let _mock_server_handle = + start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await; + let config = NodeClientConfig::new_with_port_and_retries(port, 2); let (c, _) = FramedNodeClient::new(config).await.unwrap(); let res = query_global_state_for_string_value(&mut rng, &c) @@ -663,12 +690,14 @@ mod tests { async fn given_client_should_try_until_node_starts() { let mut rng = TestRng::new(); let port = get_port(); + let shutdown = Arc::new(tokio::sync::Notify::new()); tokio::spawn(async move { sleep(Duration::from_secs(5)).await; let _mock_server_handle = - start_mock_binary_port_responding_with_stored_value(port).await; + start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)) + .await; }); - let config = NodeClientConfig::finite_retries_config(port, 5); + let config = NodeClientConfig::new_with_port_and_retries(port, 5); let (client, _) = FramedNodeClient::new(config).await.unwrap(); let res = query_global_state_for_string_value(&mut rng, &client) @@ -694,4 +723,47 @@ mod tests { .ok_or(Error::NoResponseBody) .map(|query_res| query_res.into_inner().0) } + + #[tokio::test] + async fn given_client_should_reconnect_to_restarted_node_and_do_request() { + let port = get_port(); + let mut rng = TestRng::new(); + let shutdown = Arc::new(tokio::sync::Notify::new()); + let mock_server_handle = + start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await; + let config = NodeClientConfig::new_with_port(port); + let (c, reconnect_loop) = FramedNodeClient::new(config).await.unwrap(); + + let scenario = async { + assert!(query_global_state_for_string_value(&mut rng, &c) + .await + .is_ok()); + + shutdown.notify_one(); + let _ = mock_server_handle.await; + + let err = query_global_state_for_string_value(&mut rng, &c) + .await + .unwrap_err(); + assert!(matches!( + err, + Error::RequestFailed(e) if e == "disconnected" + )); + + let _mock_server_handle = + start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)) + .await; + + tokio::time::sleep(Duration::from_secs(2)).await; + + assert!(query_global_state_for_string_value(&mut rng, &c) + .await + .is_ok()); + }; + + tokio::select! { + _ = scenario => (), + _ = reconnect_loop => panic!("reconnect loop should not exit"), + } + } } diff --git a/rpc_sidecar/src/testing/mod.rs b/rpc_sidecar/src/testing/mod.rs index 5f0cd45c..f8d9ce60 100644 --- a/rpc_sidecar/src/testing/mod.rs +++ b/rpc_sidecar/src/testing/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use casper_binary_port::{ @@ -6,6 +7,7 @@ use casper_binary_port::{ }; use casper_types::{bytesrepr::ToBytes, CLValue, ProtocolVersion, StoredValue}; use futures::{SinkExt, StreamExt}; +use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio::{ net::{TcpListener, TcpStream}, @@ -26,20 +28,27 @@ impl BinaryPortMock { Self { port, response } } - pub async fn start(&self) { + pub async fn start(&self, shutdown: Arc) { let port = self.port; let addr = format!("{}:{}", LOCALHOST, port); let listener = TcpListener::bind(addr.clone()) .await .expect("failed to listen"); loop { - match listener.accept().await { - Ok((stream, _addr)) => { - let response_payload = self.response.clone(); - tokio::spawn(handle_client(stream, response_payload)); + tokio::select! { + _ = shutdown.notified() => { + break; } - Err(io_err) => { - println!("acceptance failure: {:?}", io_err); + val = listener.accept() => { + match val { + Ok((stream, _addr)) => { + let response_payload = self.response.clone(); + tokio::spawn(handle_client(stream, response_payload)); + } + Err(io_err) => { + println!("acceptance failure: {:?}", io_err); + } + } } } } @@ -63,20 +72,23 @@ pub fn get_port() -> u16 { portpicker::pick_unused_port().unwrap() } -pub async fn start_mock_binary_port_responding_with_stored_value(port: u16) -> JoinHandle<()> { +pub async fn start_mock_binary_port_responding_with_stored_value( + port: u16, + shutdown: Arc, +) -> JoinHandle<()> { let value = StoredValue::CLValue(CLValue::from_t("Foo").unwrap()); let data = GlobalStateQueryResult::new(value, vec![]); let protocol_version = ProtocolVersion::from_parts(2, 0, 0); let val = BinaryResponse::from_value(data, protocol_version); let request = []; let response = BinaryResponseAndRequest::new(val, &request); - start_mock_binary_port(port, response.to_bytes().unwrap()).await + start_mock_binary_port(port, response.to_bytes().unwrap(), shutdown).await } -async fn start_mock_binary_port(port: u16, data: Vec) -> JoinHandle<()> { +async fn start_mock_binary_port(port: u16, data: Vec, shutdown: Arc) -> JoinHandle<()> { let handler = tokio::spawn(async move { let binary_port = BinaryPortMock::new(port, data); - binary_port.start().await; + binary_port.start(shutdown).await; }); sleep(Duration::from_secs(3)).await; // This should be handled differently, preferably the mock binary port should inform that it already bound to the port handler diff --git a/sidecar/src/component.rs b/sidecar/src/component.rs index 6e9242b8..aea1a451 100644 --- a/sidecar/src/component.rs +++ b/sidecar/src/component.rs @@ -234,6 +234,8 @@ impl Component for RpcApiComponent { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::config::SidecarConfig; use casper_rpc_sidecar::{ @@ -355,11 +357,13 @@ mod tests { #[tokio::test] async fn given_rpc_api_server_component_when_config_should_return_some() { let port = get_port(); - let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(port).await; + let shutdown = Arc::new(tokio::sync::Notify::new()); + let _mock_server_handle = + start_mock_binary_port_responding_with_stored_value(port, Arc::clone(&shutdown)).await; let component = RpcApiComponent::new(); let mut config = all_components_all_enabled(); config.rpc_server.as_mut().unwrap().node_client = - NodeClientConfig::finite_retries_config(port, 1); + NodeClientConfig::new_with_port_and_retries(port, 1); config.rpc_server.as_mut().unwrap().main_server.address = format!("0.0.0.0:{}", port); config .rpc_server