diff --git a/Cargo.lock b/Cargo.lock index ac55cc8c..063222d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,6 +175,16 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-compression" version = "0.3.15" @@ -444,6 +454,7 @@ dependencies = [ "eventsource-stream", "futures", "futures-util", + "mockito", "once_cell", "portpicker", "reqwest", @@ -651,9 +662,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] name = "constant_time_eq" @@ -712,9 +723,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +checksum = "b9bcf5bdbfdd6030fb4a1c497b5d5fc5921aa2f60d359a17e249c0e6df3de153" dependencies = [ "cfg-if", "crossbeam-utils", @@ -722,9 +733,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if", ] @@ -1386,9 +1397,9 @@ checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" [[package]] name = "hkdf" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" dependencies = [ "hmac 0.12.1", ] @@ -1414,11 +1425,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.5" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1463,9 +1474,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", @@ -1478,7 +1489,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", @@ -1809,6 +1820,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockito" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b" +dependencies = [ + "assert-json-diff", + "colored", + "futures", + "hyper", + "log", + "rand 0.8.5", + "regex", + "serde_json", + "serde_urlencoded", + "similar", + "tokio", +] + [[package]] name = "multer" version = "2.1.0" @@ -2582,9 +2612,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64 0.21.5", "bytes", @@ -2849,9 +2879,9 @@ dependencies = [ [[package]] name = "sea-query" -version = "0.30.4" +version = "0.30.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41558fa9bb5f4d73952dac0b9d9c2ce23966493fc9ee0008037b01d709838a68" +checksum = "e40446e3c048cec0802375f52462a05cc774b9ea6af1dffba6c646b7825e4cf9" dependencies = [ "inherent", "sea-query-derive", @@ -3039,6 +3069,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "similar" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597" + [[package]] name = "slab" version = "0.4.9" @@ -3054,16 +3090,6 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.5" @@ -3557,18 +3583,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", @@ -3607,9 +3633,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "powerfmt", @@ -3626,9 +3652,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" dependencies = [ "time-core", ] @@ -3650,9 +3676,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -3662,7 +3688,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -4523,18 +4549,18 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "zerocopy" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "306dca4455518f1f31635ec308b6b3e4eb1b11758cefafc782827d0aa7acb5c7" +checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" +checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", diff --git a/listener/Cargo.toml b/listener/Cargo.toml index ee3fa841..c79f9223 100644 --- a/listener/Cargo.toml +++ b/listener/Cargo.toml @@ -32,5 +32,6 @@ futures-util = { workspace = true } [dev-dependencies] casper-event-types = { path = "../types", version = "1.0.0", features = ["sse-data-testing"]} eventsource-stream = "0.2.3" +mockito = "1.2.0" portpicker = "0.1.1" warp = { version = "0.3.6"} diff --git a/listener/src/connection_manager.rs b/listener/src/connection_manager.rs index 60965b09..7eb4cde3 100644 --- a/listener/src/connection_manager.rs +++ b/listener/src/connection_manager.rs @@ -360,13 +360,19 @@ fn count_error(reason: &str) { #[cfg(test)] pub mod tests { + use super::ConnectionManager; use crate::{ connection_manager::{ConnectionManagerError, DefaultConnectionManager, FIRST_EVENT_EMPTY}, sse_connector::{tests::MockSseConnection, StreamConnector}, SseEvent, }; + use anyhow::Error; use casper_event_types::{sse_data::test_support::*, Filter}; - use tokio::sync::mpsc::{channel, Receiver}; + use std::time::Duration; + use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + time::sleep, + }; use url::Url; #[tokio::test] @@ -483,4 +489,52 @@ pub mod tests { }; (manager, data_rx, event_id_rx) } + + pub struct MockConnectionManager { + sender: Sender, + finish_after: Duration, + to_return: Option>, + msg: Option, + } + + impl MockConnectionManager { + pub fn new( + finish_after: Duration, + to_return: Result<(), ConnectionManagerError>, + sender: Sender, + msg: Option, + ) -> Self { + Self { + sender, + finish_after, + to_return: Some(to_return), + msg, + } + } + pub fn fail_fast(sender: Sender) -> Self { + let error = Error::msg("xyz"); + let a = Err(ConnectionManagerError::NonRecoverableError { error }); + Self::new(Duration::from_millis(1), a, sender, None) + } + + pub fn ok_long(sender: Sender, msg: Option<&str>) -> Self { + Self::new( + Duration::from_secs(10), + Ok(()), + sender, + msg.map(|s| s.to_string()), + ) + } + } + + #[async_trait::async_trait] + impl ConnectionManager for MockConnectionManager { + async fn start_handling(&mut self) -> Result<(), ConnectionManagerError> { + if let Some(msg) = &self.msg { + self.sender.send(msg.clone()).await.unwrap(); + } + sleep(self.finish_after).await; + self.to_return.take().unwrap() //Unwraping on purpose - this method should only be called once. + } + } } diff --git a/listener/src/connections_builder.rs b/listener/src/connections_builder.rs index 7ff8e326..c2f8158f 100644 --- a/listener/src/connections_builder.rs +++ b/listener/src/connections_builder.rs @@ -106,3 +106,158 @@ pub struct ConnectionConfig { pub ip_address: IpAddr, pub sse_port: u16, } + +#[cfg(test)] +pub mod tests { + use super::ConnectionsBuilder; + use crate::{ + connection_manager::{tests::MockConnectionManager, ConnectionManager}, + FilterWithEventId, + }; + use anyhow::Error; + use async_trait::async_trait; + use casper_event_types::Filter; + use casper_types::ProtocolVersion; + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + }; + use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, + }; + + pub type ResultsStoredInMock = Vec>, Error>>; + + pub struct MockConnectionsBuilder { + data_pushed_from_connections: Arc>>, + result: Mutex, + maybe_protocol_version: Mutex>, + } + + impl Default for MockConnectionsBuilder { + fn default() -> Self { + Self { + data_pushed_from_connections: Arc::new(Mutex::new(vec![])), + result: Mutex::new(vec![Ok(HashMap::new())]), + maybe_protocol_version: Mutex::new(None), + } + } + } + + impl MockConnectionsBuilder { + pub fn one_fails_second_is_ok() -> Self { + let (tx, rx) = channel(100); + let results = vec![ + response_with_failing_events("1", &tx), + response_with_all_connections_ok("2", &tx), + ]; + Self::builder_based_on_result(rx, results) + } + + pub fn ok_after_two_fails() -> Self { + let (tx, rx) = channel(100); + let results = vec![ + Err(Error::msg("Connection failed")), + Err(Error::msg("Connection failed 2")), + response_with_all_connections_ok("2", &tx), + ]; + Self::builder_based_on_result(rx, results) + } + + pub fn connection_fails() -> Self { + let (_, rx) = channel(100); + let results = vec![Err(Error::msg("Connection failed"))]; + Self::builder_based_on_result(rx, results) + } + + pub fn one_fails_immediatly() -> Self { + let (tx, rx) = channel(100); + let results = vec![response_with_failing_events("1", &tx)]; + Self::builder_based_on_result(rx, results) + } + + pub async fn get_received_data(&self) -> HashSet { + let data = self.data_pushed_from_connections.lock().await; + HashSet::from_iter(data.iter().cloned()) + } + + pub async fn get_recorded_protocol_version(&self) -> Option { + let data = self.maybe_protocol_version.lock().await; + *data + } + + fn builder_based_on_result(mut rx: Receiver, results: ResultsStoredInMock) -> Self { + let data_pushed_from_connections = Arc::new(Mutex::new(vec![])); + let data_pushed_from_connections_clone = data_pushed_from_connections.clone(); + tokio::spawn(async move { + while let Some(x) = rx.recv().await { + let mut v = data_pushed_from_connections_clone.lock().await; + v.push(x); + drop(v); + } + }); + Self { + data_pushed_from_connections, + result: Mutex::new(results), + maybe_protocol_version: Mutex::new(None), + } + } + } + + fn response_with_all_connections_ok( + msg_postfix: &str, + tx: &Sender, + ) -> Result>, Error> { + let events_msg = format!("events-{}", msg_postfix); + let events: Box = Box::new(MockConnectionManager::ok_long( + tx.clone(), + Some(events_msg.as_str()), + )); + let main_msg = format!("main-{}", msg_postfix); + let main: Box = Box::new(MockConnectionManager::ok_long( + tx.clone(), + Some(main_msg.as_str()), + )); + Ok(HashMap::from([ + (Filter::Events, events), + (Filter::Main, main), + ])) + } + + fn response_with_failing_events( + msg_postfix: &str, + tx: &Sender, + ) -> Result>, Error> { + let events: Box = + Box::new(MockConnectionManager::fail_fast(tx.clone())); + let main_msg = format!("main-{}", msg_postfix); + let main: Box = Box::new(MockConnectionManager::ok_long( + tx.clone(), + Some(main_msg.as_str()), + )); + Ok(HashMap::from([ + (Filter::Events, events), + (Filter::Main, main), + ])) + } + + #[async_trait] + impl ConnectionsBuilder for MockConnectionsBuilder { + async fn build_connections( + &self, + _last_event_id_for_filter: Arc>>, + _last_seen_event_id_sender: FilterWithEventId, + node_build_version: ProtocolVersion, + ) -> Result>, Error> { + let mut guard = self.maybe_protocol_version.lock().await; + *guard = Some(node_build_version); + drop(guard); + let mut guard = self.result.lock().await; + if !guard.is_empty() { + return guard.remove(0); + } + Err(Error::msg("No more connections to build")) + } + } +} diff --git a/listener/src/event_listener_status.rs b/listener/src/event_listener_status.rs new file mode 100644 index 00000000..85e494d9 --- /dev/null +++ b/listener/src/event_listener_status.rs @@ -0,0 +1,38 @@ +use casper_event_types::metrics; + +/// Helper enum determining in what state connection to a node is in. +/// It's used to named different situations in which the connection can be. +pub(super) enum EventListenerStatus { + /// Event Listener has not yet started to attempt the connection + Preparing, + /// Event Listener started establishing relevant sse connections to filters of the node + Connecting, + /// Event Listener got data from at least one of the nodes sse connections. + Connected, + /// For some reason Event Listener lost connection to the node and is trying to establish it again + Reconnecting, + /// If Event Listener reports this state it means that it was unable to establish a connection + /// with node and there are no more `max_connection_attempts` left. There will be no futhrer + /// tries to establish the connection. + Defunct, + /// If Event Listener reports this state it means that the node it was trying to connect to has a + /// version which sidecar can't work with + IncompatibleVersion, +} + +impl EventListenerStatus { + pub(super) fn log_status(&self, node_address: &str, sse_port: u16) { + let status = match self { + EventListenerStatus::Preparing => 0, + EventListenerStatus::Connecting => 1, + EventListenerStatus::Connected => 2, + EventListenerStatus::Reconnecting => 3, + EventListenerStatus::Defunct => -1, + EventListenerStatus::IncompatibleVersion => -2, + } as f64; + let node_label = format!("{}:{}", node_address, sse_port); + metrics::NODE_STATUSES + .with_label_values(&[node_label.as_str()]) + .set(status); + } +} diff --git a/listener/src/lib.rs b/listener/src/lib.rs index 14c8e07e..baf9dc05 100644 --- a/listener/src/lib.rs +++ b/listener/src/lib.rs @@ -5,18 +5,20 @@ mod connection_manager; mod connection_tasks; pub mod connections_builder; +mod event_listener_status; mod keep_alive_monitor; mod sse_connector; mod types; -use anyhow::{anyhow, Context, Error}; -use casper_event_types::{metrics, Filter}; +mod version_fetcher; +use crate::event_listener_status::*; +use anyhow::Error; +use casper_event_types::Filter; use casper_types::ProtocolVersion; use connection_manager::{ConnectionManager, ConnectionManagerError}; use connection_tasks::ConnectionTasks; use connections_builder::{ConnectionsBuilder, DefaultConnectionsBuilder}; use once_cell::sync::Lazy; -use serde_json::Value; -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::IpAddr, str::FromStr, sync::Arc, time::Duration}; use tokio::{ sync::{ mpsc::{self, Sender}, @@ -27,8 +29,8 @@ use tokio::{ use tracing::{debug, error, info, warn}; pub use types::{NodeConnectionInterface, SseEvent}; use url::Url; +use version_fetcher::{for_status_endpoint, BuildVersionFetchError, VersionFetcher}; -const BUILD_VERSION_KEY: &str = "build_version"; pub static MINIMAL_NODE_VERSION: Lazy = Lazy::new(|| ProtocolVersion::from_parts(1, 5, 2)); const MAX_CONNECTION_ATTEMPTS_REACHED: &str = "Max connection attempts reached"; @@ -48,6 +50,8 @@ type FilterWithEventId = Sender<(Filter, u32)>; type CurrentFilterToIdHolder = Arc>>; impl EventListenerBuilder { pub fn build(&self) -> Result { + let status_endpoint = status_endpoint(self.node.ip_address, self.node.rest_port)?; + let version_fetcher = Arc::new(for_status_endpoint(status_endpoint)); let connections_builder = Arc::new(DefaultConnectionsBuilder { sleep_between_keep_alive_checks: self.sleep_between_keep_alive_checks, no_message_timeout: self.no_message_timeout, @@ -64,6 +68,7 @@ impl EventListenerBuilder { max_connection_attempts: self.max_connection_attempts, delay_between_attempts: self.delay_between_attempts, allow_partial_connection: self.allow_partial_connection, + version_fetcher, connections_builder, }) } @@ -82,56 +87,17 @@ pub struct EventListener { /// If set to false, the listener needs to connect to all endpoints a node should expose in a given `node_build_version` for the listener to start processing data. /// If set to true the listen will proceed after connecting to at least one connection. allow_partial_connection: bool, + /// Fetches the build version of the node + version_fetcher: Arc, + /// Builder of the connections to the node connections_builder: Arc, } -/// Helper enum determining in what state connection to a node is in. -/// It's used to named different situations in which the connection can be. -pub enum EventListenerStatus { - /// Event Listener has not yet started to attempt the connection - Preparing, - /// Event Listener started establishing relevant sse connections to filters of the node - Connecting, - /// Event Listener got data from at least one of the nodes sse connections. - Connected, - /// For some reason Event Listener lost connection to the node and is trying to establish it again - Reconnecting, - /// If Event Listener reports this state it means that it was unable to establish a connection - /// with node and there are no more `max_connection_attempts` left. There will be no futhrer - /// tries to establish the connection. - Defunct, - /// If Event Listener reports this state it means that the node it was trying to connect to has a - /// version which sidecar can't work with - IncompatibleVersion, -} - -impl EventListenerStatus { - pub fn log_status(&self, node_address: &str, sse_port: u16) { - let status = match self { - EventListenerStatus::Preparing => 0, - EventListenerStatus::Connecting => 1, - EventListenerStatus::Connected => 2, - EventListenerStatus::Reconnecting => 3, - EventListenerStatus::Defunct => -1, - EventListenerStatus::IncompatibleVersion => -2, - } as f64; - let node_label = format!("{}:{}", node_address, sse_port); - metrics::NODE_STATUSES - .with_label_values(&[node_label.as_str()]) - .set(status); - } -} - enum ConnectOutcome { ConnectionLost, SystemReconnect, //In this case we don't increase the current_attempt counter } -enum BuildVersionFetchError { - Error(anyhow::Error), - VersionNotAcceptable(String), -} - enum GetVersionResult { Ok(Option), Retry, @@ -143,48 +109,6 @@ impl EventListener { self.node.clone() } - async fn fetch_build_version( - &self, - curent_protocol_version: ProtocolVersion, - current_attempt: usize, - ) -> Result, BuildVersionFetchError> { - info!( - "Attempting to connect {} \t{}/{}", - self.node.ip_address, current_attempt, self.max_connection_attempts - ); - match self.fetch_build_version_from_status().await { - Ok(version) => { - validate_version(&version).map_err(|err| { - log_status_for_event_listener(EventListenerStatus::IncompatibleVersion, self); - err - })?; - let new_node_build_version = version; - // Compare versions to reset attempts in the case that the version changed. - // This guards against endlessly retrying when the version hasn't changed, suggesting - // that the node is unavailable. - // If the connection has been failing and we see the build version change we reset - // the attempts as it may have down for an upgrade. - if new_node_build_version != curent_protocol_version { - return Ok(Some(new_node_build_version)); - } - Ok(Some(curent_protocol_version)) - } - Err(fetch_err) => { - error!( - "Error fetching build version (for {}): {fetch_err}", - self.node.ip_address - ); - if current_attempt >= self.max_connection_attempts { - log_status_for_event_listener(EventListenerStatus::Defunct, self); - return Err(BuildVersionFetchError::Error(Error::msg( - "Unable to retrieve build version from node status", - ))); - } - Ok(None) - } - } - } - /// Spins up the connections and starts pushing data from node pub async fn stream_aggregated_events(&mut self) -> Result<(), Error> { log_status_for_event_listener(EventListenerStatus::Preparing, self); @@ -211,16 +135,16 @@ impl EventListener { GetVersionResult::Error(e) => return Err(e), _ => {} } - if let ConnectOutcome::ConnectionLost = self + if let Ok(ConnectOutcome::ConnectionLost) = self .do_connect( last_event_id_for_filter.clone(), last_seen_event_id_sender.clone(), ) - .await? + .await { - current_attempt += 1; warn_connection_lost(self, current_attempt); } + current_attempt += 1; } log_status_for_event_listener(EventListenerStatus::Defunct, self); Err(Error::msg(MAX_CONNECTION_ATTEMPTS_REACHED)) @@ -311,49 +235,14 @@ impl EventListener { } } - fn status_endpoint(&self) -> Result { - let status_endpoint_str = format!( - "http://{}:{}/status", - self.node.ip_address, self.node.rest_port - ); - Url::from_str(&status_endpoint_str).map_err(Error::from) - } - - // Fetch the build version by requesting the status from the node's rest server. - async fn fetch_build_version_from_status(&self) -> Result { - debug!( - "Fetching build version for {}", - self.status_endpoint().unwrap() - ); - - let status_endpoint = self - .status_endpoint() - .context("Should have created status URL")?; - - let status_response = reqwest::get(status_endpoint) - .await - .context("Should have responded with status")?; - - // The exact structure of the response varies over the versions but there is an assertion made - // that .build_version is always valid. So the key is accessed without deserialising the full response. - let response_json: Value = status_response - .json() - .await - .context("Should have parsed JSON from response")?; - - try_resolve_version(response_json) - } - async fn get_version(&mut self, current_attempt: usize) -> GetVersionResult { info!( "Attempting to connect...\t{}/{}", current_attempt, self.max_connection_attempts ); - let fetch_result = self - .fetch_build_version(self.node_build_version, current_attempt) - .await; + let fetch_result = self.version_fetcher.fetch().await; match fetch_result { - Ok(Some(new_node_build_version)) => { + Ok(new_node_build_version) => { if self.node_build_version != new_node_build_version { return GetVersionResult::Ok(Some(new_node_build_version)); } @@ -364,7 +253,13 @@ impl EventListener { //The node has a build version which sidecar can't talk to. Failing fast in this case. GetVersionResult::Error(Error::msg(msg)) } - _ => GetVersionResult::Retry, + Err(BuildVersionFetchError::Error(err)) => { + error!( + "Error fetching build version (for {}): {err}", + self.node.ip_address + ); + GetVersionResult::Retry + } } } @@ -411,61 +306,15 @@ fn start_connections( .collect() } -fn try_resolve_version(raw_response: Value) -> Result { - match raw_response.get(BUILD_VERSION_KEY) { - Some(build_version_value) if build_version_value.is_string() => { - let raw = build_version_value - .as_str() - .context("build_version_value should be a string") - .map_err(|e| { - count_error("version_value_not_a_string"); - e - })? - .split('-') - .next() - .context("splitting build_version_value should always return at least one slice") - .map_err(|e| { - count_error("incomprehensible_build_version_form"); - e - })?; - ProtocolVersion::from_str(raw).map_err(|error| { - count_error("failed_parsing_protocol_version"); - anyhow!("failed parsing build version from '{}': {}", raw, error) - }) - } - _ => { - count_error("failed_getting_status_from_payload"); - Err(anyhow!( - "failed to get {} from status response {}", - BUILD_VERSION_KEY, - raw_response - )) - } - } -} - fn log_status_for_event_listener(status: EventListenerStatus, event_listener: &EventListener) { let node_address = event_listener.node.ip_address.to_string(); let sse_port = event_listener.node.sse_port; status.log_status(node_address.as_str(), sse_port); } -fn count_error(reason: &str) { - casper_event_types::metrics::ERROR_COUNTS - .with_label_values(&["event_listener", reason]) - .inc(); -} - -fn validate_version(version: &ProtocolVersion) -> Result<(), BuildVersionFetchError> { - if version.lt(&MINIMAL_NODE_VERSION) { - let msg = format!( - "Node version expected to be >= {}.", - MINIMAL_NODE_VERSION.value(), - ); - Err(BuildVersionFetchError::VersionNotAcceptable(msg)) - } else { - Ok(()) - } +fn status_endpoint(ip_address: IpAddr, rest_port: u16) -> Result { + let status_endpoint_str = format!("http://{}:{}/status", ip_address, rest_port); + Url::from_str(&status_endpoint_str).map_err(Error::from) } fn warn_connection_lost(listener: &EventListener, current_attempt: usize) { @@ -474,3 +323,98 @@ fn warn_connection_lost(listener: &EventListener, current_attempt: usize) { listener.node.ip_address, current_attempt, listener.max_connection_attempts ); } + +#[cfg(test)] +mod tests { + use crate::{ + connections_builder::tests::MockConnectionsBuilder, + version_fetcher::{tests::MockVersionFetcher, BuildVersionFetchError}, + EventListener, NodeConnectionInterface, + }; + use anyhow::Error; + use casper_types::ProtocolVersion; + use std::{collections::HashSet, sync::Arc, time::Duration}; + + #[tokio::test] + async fn given_event_listener_should_not_connect_when_incompatible_version() { + let version_fetcher = MockVersionFetcher::new(vec![Err( + BuildVersionFetchError::VersionNotAcceptable("1.5.10".to_string()), + )]); + let connections_builder = Arc::new(MockConnectionsBuilder::default()); + + let err = run_event_listener(2, version_fetcher, connections_builder.clone(), true).await; + + assert!(err.to_string().contains("1.5.10")); + } + + #[tokio::test] + async fn given_event_listener_should_fail_when_one_connection_manager_fails_other_does_not() { + let version_fetcher = MockVersionFetcher::repeatable_from_protocol_version("1.5.10"); + let connections_builder = Arc::new(MockConnectionsBuilder::one_fails_immediatly()); + + let err = run_event_listener(1, version_fetcher, connections_builder.clone(), true).await; + + let received_data = connections_builder.get_received_data().await; + assert_eq!(received_data.len(), 1); + assert!(set_contains(received_data, vec!["main-1"],)); + assert!(err.to_string().contains("Max connection attempts reached")); + } + + #[tokio::test] + async fn given_event_listener_should_fail_if_connection_fails() { + let version_fetcher = MockVersionFetcher::repeatable_from_protocol_version("1.5.10"); + let connections_builder = Arc::new(MockConnectionsBuilder::connection_fails()); + + let err = run_event_listener(1, version_fetcher, connections_builder.clone(), true).await; + + assert!(err.to_string().contains("Max connection attempts reached")); + let received_data = connections_builder.get_received_data().await; + assert!(received_data.is_empty()); + } + + #[tokio::test] + async fn given_event_listener_should_fetch_data_if_enough_reconnections() { + let version_fetcher = MockVersionFetcher::repeatable_from_protocol_version("1.5.10"); + let connections_builder = Arc::new(MockConnectionsBuilder::ok_after_two_fails()); + + let err = run_event_listener(3, version_fetcher, connections_builder.clone(), true).await; + + let received_data = connections_builder.get_received_data().await; + assert_eq!(received_data.len(), 2); + assert!(set_contains(received_data, vec!["main-2", "events-2"],)); + assert!(err.to_string().contains("Max connection attempts reached")); + } + + #[tokio::test] + async fn given_event_listener_should_give_up_retrying_if_runs_out() { + let version_fetcher = MockVersionFetcher::repeatable_from_protocol_version("1.5.10"); + let connections_builder = Arc::new(MockConnectionsBuilder::ok_after_two_fails()); + + let err = run_event_listener(2, version_fetcher, connections_builder.clone(), true).await; + assert!(err.to_string().contains("Max connection attempts reached")); + let received_data = connections_builder.get_received_data().await; + assert!(received_data.is_empty()); + } + + async fn run_event_listener( + max_connection_attempts: usize, + version_fetcher: MockVersionFetcher, + connections_builder: Arc, + allow_partial_connection: bool, + ) -> Error { + let mut listener = EventListener { + node_build_version: ProtocolVersion::from_parts(1, 0, 0), + node: NodeConnectionInterface::default(), + max_connection_attempts, + delay_between_attempts: Duration::from_secs(1), + allow_partial_connection, + version_fetcher: Arc::new(version_fetcher), + connections_builder, + }; + listener.stream_aggregated_events().await.unwrap_err() + } + + fn set_contains(set: HashSet, value: Vec<&str>) -> bool { + value.iter().all(|v| set.contains(*v)) + } +} diff --git a/listener/src/types.rs b/listener/src/types.rs index bc7ee32c..dd39c63c 100644 --- a/listener/src/types.rs +++ b/listener/src/types.rs @@ -13,6 +13,17 @@ pub struct NodeConnectionInterface { pub rest_port: u16, } +#[cfg(test)] +impl Default for NodeConnectionInterface { + fn default() -> Self { + Self { + ip_address: "127.0.0.1".parse().unwrap(), + sse_port: 100, + rest_port: 200, + } + } +} + /// Data fot from sse connection to node which sidecar cares about. pub struct SseEvent { /// Id of the message diff --git a/listener/src/version_fetcher.rs b/listener/src/version_fetcher.rs new file mode 100644 index 00000000..cbfe9d88 --- /dev/null +++ b/listener/src/version_fetcher.rs @@ -0,0 +1,228 @@ +use anyhow::{anyhow, Context, Error}; +use async_trait::async_trait; +use casper_types::ProtocolVersion; +use once_cell::sync::Lazy; +use serde_json::Value; +use std::str::FromStr; +use tracing::debug; +use url::Url; + +const BUILD_VERSION_KEY: &str = "build_version"; + +pub static MINIMAL_NODE_VERSION: Lazy = + Lazy::new(|| ProtocolVersion::from_parts(1, 5, 2)); + +#[derive(Debug)] +pub enum BuildVersionFetchError { + Error(anyhow::Error), + VersionNotAcceptable(String), +} + +#[cfg(test)] +impl Clone for BuildVersionFetchError { + fn clone(&self) -> Self { + match self { + Self::Error(err) => Self::Error(Error::msg(err.to_string())), + Self::VersionNotAcceptable(arg0) => Self::VersionNotAcceptable(arg0.clone()), + } + } +} + +#[async_trait] +pub trait VersionFetcher: Sync + Send { + async fn fetch(&self) -> Result; +} +pub fn for_status_endpoint(status_endpoint: Url) -> impl VersionFetcher { + StatusEndpointVersionFetcher { status_endpoint } +} + +#[derive(Clone)] +pub struct StatusEndpointVersionFetcher { + status_endpoint: Url, +} + +#[async_trait] +impl VersionFetcher for StatusEndpointVersionFetcher { + async fn fetch(&self) -> Result { + let status_endpoint = self.status_endpoint.clone(); + debug!("Fetching build version for {}", status_endpoint); + match fetch_build_version_from_status(status_endpoint).await { + Ok(version) => { + validate_version(&version)?; + Ok(version) + } + Err(fetch_err) => Err(BuildVersionFetchError::Error(fetch_err)), + } + } +} + +// Fetch the build version by requesting the status from the node's rest server. +async fn fetch_build_version_from_status(status_endpoint: Url) -> Result { + let status_response = reqwest::get(status_endpoint) + .await + .context("Should have responded with status")?; + + // The exact structure of the response varies over the versions but there is an assertion made + // that .build_version is always valid. So the key is accessed without deserialising the full response. + let response_json: Value = status_response + .json() + .await + .context("Should have parsed JSON from response")?; + + try_resolve_version(response_json) +} + +fn validate_version(version: &ProtocolVersion) -> Result<(), BuildVersionFetchError> { + if version.lt(&MINIMAL_NODE_VERSION) { + let msg = format!( + "Node version expected to be >= {}.", + MINIMAL_NODE_VERSION.value(), + ); + Err(BuildVersionFetchError::VersionNotAcceptable(msg)) + } else { + Ok(()) + } +} + +fn try_resolve_version(raw_response: Value) -> Result { + match raw_response.get(BUILD_VERSION_KEY) { + Some(build_version_value) if build_version_value.is_string() => { + let raw = build_version_value + .as_str() + .context("build_version_value should be a string") + .map_err(|e| { + count_error("version_value_not_a_string"); + e + })? + .split('-') + .next() + .context("splitting build_version_value should always return at least one slice") + .map_err(|e| { + count_error("incomprehensible_build_version_form"); + e + })?; + ProtocolVersion::from_str(raw).map_err(|error| { + count_error("failed_parsing_protocol_version"); + anyhow!("failed parsing build version from '{}': {}", raw, error) + }) + } + _ => { + count_error("failed_getting_status_from_payload"); + Err(anyhow!( + "failed to get {} from status response {}", + BUILD_VERSION_KEY, + raw_response + )) + } + } +} + +fn count_error(reason: &str) { + casper_event_types::metrics::ERROR_COUNTS + .with_label_values(&["fetching_build_version_for_node", reason]) + .inc(); +} + +#[cfg(test)] +pub mod tests { + use super::*; + use casper_types::{ProtocolVersion, SemVer}; + use mockito::{Mock, Server, ServerGuard}; + use serde_json::json; + use tokio::sync::Mutex; + + #[tokio::test] + async fn try_resolve_version_should_interpret_cortest_by_build_versionrect_build_version() { + let mut protocol = test_by_build_version(Some("5.1.111-b94c4f79a")) + .await + .unwrap(); + assert_eq!(protocol, ProtocolVersion::new(SemVer::new(5, 1, 111))); + + protocol = test_by_build_version(Some("6.2.112-b94c4f79a-casper-mainnet")) + .await + .unwrap(); + assert_eq!(protocol, ProtocolVersion::new(SemVer::new(6, 2, 112))); + + protocol = test_by_build_version(Some("7.3.113")).await.unwrap(); + assert_eq!(protocol, ProtocolVersion::new(SemVer::new(7, 3, 113))); + + let version_validation_failed = test_by_build_version(Some("1.5.1")).await; + assert!(matches!( + version_validation_failed, + Err(BuildVersionFetchError::VersionNotAcceptable(_)) + )); + } + + #[tokio::test] + async fn try_resolve_should_fail_if_build_version_is_absent() { + let ret = test_by_build_version(None).await; + assert!(ret.is_err()); + } + + #[tokio::test] + async fn try_resolve_should_fail_if_build_version_is_invalid() { + let ret = test_by_build_version(Some("not-a-semver")).await; + assert!(ret.is_err()); + } + + fn build_server_mock(build_version: Option<&str>) -> (Mock, String, ServerGuard) { + let mut server = Server::new(); + let url = format!("{}/status", server.url()); + let json_object = match build_version { + Some(version) => json!({ BUILD_VERSION_KEY: version }), + None => json!({}), + }; + let raw_json = json_object.to_string(); + let mock = server + .mock("GET", "/status") + .with_status(201) + .with_header("content-type", "application/json") + .with_body(raw_json) + .create(); + //We need to return the server guard here otherwise it will get dropped and the mock won't work correctly. + (mock, url, server) + } + + async fn test_by_build_version( + build_version: Option<&str>, + ) -> Result { + let (mock, url, _server) = build_server_mock(build_version); + let result = for_status_endpoint(Url::parse(&url).unwrap()).fetch().await; + mock.assert(); + result + } + + pub struct MockVersionFetcher { + repeatable: bool, + version_responses: Mutex>>, + } + + impl MockVersionFetcher { + pub fn repeatable_from_protocol_version(version: &str) -> Self { + let protocol_version = ProtocolVersion::from_str(version).unwrap(); + Self { + repeatable: true, + version_responses: Mutex::new(vec![Ok(protocol_version)]), + } + } + pub fn new( + version_responses: Vec>, + ) -> Self { + Self { + repeatable: false, + version_responses: Mutex::new(version_responses), + } + } + } + + #[async_trait] + impl VersionFetcher for MockVersionFetcher { + async fn fetch(&self) -> Result { + let mut version_responses = self.version_responses.lock().await; + if self.repeatable { + return version_responses[0].clone(); + } + version_responses.pop().unwrap() //If we are fetching something that wasn't prepared it should be an error + } + } +}