Skip to content

Commit

Permalink
Changing internal structure of EventListener so it can be unit teste
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Zajkowski committed Dec 12, 2023
1 parent 584cb5c commit f87bfda
Show file tree
Hide file tree
Showing 14 changed files with 942 additions and 418 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ futures-util = { workspace = true }
[dev-dependencies]
casper-event-types = { path = "../types", version = "1.0.0", features = ["sse-data-testing"]}
eventsource-stream = "0.2.3"
mockito = "1.2.0"
portpicker = "0.1.1"
warp = { version = "0.3.6"}
115 changes: 90 additions & 25 deletions listener/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
SseEvent,
};
use anyhow::Error;
use async_trait::async_trait;
use casper_event_types::{
metrics,
sse_data::{deserialize, SseData},
Expand All @@ -21,16 +22,27 @@ use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
use tracing::{error, trace, warn};

const API_VERSION: &str = "ApiVersion";
const FETCHING_FROM_STREAM_FAILED: &str = "fetching_from_stream_failed";
const FIRST_EVENT_EMPTY: &str = "First event was empty";
const ERROR_WHEN_TRYING_TO_SEND_MESSAGE: &str =
"Error when trying to send message in ConnectionManager#handle_event";
const DESERIALIZATION_ERROR: &str = "deserialization_error";
const EVENT_WITHOUT_ID: &str = "event_without_id";
const SENDING_FAILED: &str = "sending_downstream_failed";
const API_VERSION_SENDING_FAILED: &str = "api_version_sending_failed";
const API_VERSION_DESERIALIZATION_FAILED: &str = "api_version_deserialization_failed";
const API_VERSION_EXPECTED: &str = "api_version_expected";
const OTHER_TYPE_OF_MESSAGE_WHEN_API_VERSION_EXPECTED: &str =
"When trying to deserialize ApiVersion got other type of message";

#[async_trait]
pub trait ConnectionManager: Sync + Send {
async fn start_handling(&mut self) -> Result<(), ConnectionManagerError>;
}

/// Implementation of a connection to a single sse endpoint of a node.
pub(super) struct ConnectionManager {
pub struct DefaultConnectionManager {
connector: Box<dyn StreamConnector + Send + Sync>,
bind_address: Url,
current_event_id: Option<u32>,
Expand Down Expand Up @@ -60,7 +72,7 @@ impl Display for ConnectionManagerError {
}

/// Builder for [ConnectionManager]
pub struct ConnectionManagerBuilder {
pub struct DefaultConnectionManagerBuilder {
/// Address of the node
pub(super) bind_address: Url,
/// Maximum attempts the connection manager will try to (initially) connect.
Expand Down Expand Up @@ -89,8 +101,14 @@ pub struct ConnectionManagerBuilder {
pub(super) no_message_timeout: Duration,
}

impl ConnectionManagerBuilder {
pub(super) fn build(self) -> ConnectionManager {
#[async_trait::async_trait]
impl ConnectionManager for DefaultConnectionManager {
async fn start_handling(&mut self) -> Result<(), ConnectionManagerError> {
self.do_start_handling().await
}
}
impl DefaultConnectionManagerBuilder {
pub(super) fn build(self) -> DefaultConnectionManager {
trace!("Creating connection manager for: {}", self.bind_address);
let connector = Box::new(SseConnection {
max_attempts: self.max_attempts,
Expand All @@ -100,7 +118,7 @@ impl ConnectionManagerBuilder {
sleep_between_keepalive_checks: self.sleep_between_keep_alive_checks,
no_message_timeout: self.no_message_timeout,
});
ConnectionManager {
DefaultConnectionManager {
connector,
bind_address: self.bind_address,
current_event_id: self.start_from_event_id,
Expand All @@ -112,12 +130,9 @@ impl ConnectionManagerBuilder {
}
}

impl ConnectionManager {
impl DefaultConnectionManager {
/// Start handling traffic from nodes endpoint. This function is blocking, it will return a
/// ConnectionManagerError result if something went wrong while processing.
pub(super) async fn start_handling(&mut self) -> Result<(), ConnectionManagerError> {
self.do_start_handling().await
}
async fn connect(
&mut self,
Expand Down Expand Up @@ -184,7 +199,7 @@ impl ConnectionManager {
Err(parse_error) => {
// ApiVersion events have no ID so parsing "" to u32 will fail.
// This gate saves displaying a warning for a trivial error.
if !event.data.contains("ApiVersion") {
if !event.data.contains(API_VERSION) {
count_error(EVENT_WITHOUT_ID);
warn!("Parse Error: {}", parse_error);
}
Expand Down Expand Up @@ -229,9 +244,7 @@ impl ConnectionManager {
);
self.sse_event_sender.send(sse_event).await.map_err(|_| {
count_error(SENDING_FAILED);
Error::msg(
"Error when trying to send message in ConnectionManager#handle_event",
)
Error::msg(ERROR_WHEN_TRYING_TO_SEND_MESSAGE)
})?;
}
}
Expand All @@ -246,12 +259,12 @@ impl ConnectionManager {
// We want to see if the first message got from a connection is ApiVersion. That is the protocols guarantee.
// If it's not - something went very wrong and we shouldn't consider this connection valid
match receiver.next().await {
None => Err(recoverable_error(Error::msg("First event was empty"))),
None => Err(recoverable_error(Error::msg(FIRST_EVENT_EMPTY))),
Some(Err(error)) => Err(failed_to_get_first_event(error)),
Some(Ok(event)) => {
let payload_size = event.data.len();
self.observe_bytes(payload_size);
if event.data.contains("ApiVersion") {
if event.data.contains(API_VERSION) {
self.try_handle_api_version_message(&event, receiver).await
} else {
Err(expected_first_message_to_be_api_version(event.data))
Expand Down Expand Up @@ -279,15 +292,13 @@ impl ConnectionManager {
);
self.sse_event_sender.send(sse_event).await.map_err(|_| {
count_error(API_VERSION_SENDING_FAILED);
non_recoverable_error(Error::msg(
"Error when trying to send message in ConnectionManager#handle_event",
))
non_recoverable_error(Error::msg(ERROR_WHEN_TRYING_TO_SEND_MESSAGE))
})?
}
Ok(_sse_data) => {
count_error(API_VERSION_EXPECTED);
return Err(non_recoverable_error(Error::msg(
"When trying to deserialize ApiVersion got other type of message",
OTHER_TYPE_OF_MESSAGE_WHEN_API_VERSION_EXPECTED,
)));
}
Err(x) => {
Expand Down Expand Up @@ -347,14 +358,20 @@ fn count_error(reason: &str) {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::ConnectionManager;
use crate::{
connection_manager::{ConnectionManager, ConnectionManagerError},
connection_manager::{ConnectionManagerError, DefaultConnectionManager, FIRST_EVENT_EMPTY},
sse_connector::{tests::MockSseConnection, StreamConnector},
SseEvent,
};
use anyhow::Error;
use casper_event_types::{sse_data::test_support::*, Filter};
use tokio::sync::mpsc::{channel, Receiver};
use std::time::Duration;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
time::sleep,
};
use url::Url;

#[tokio::test]
Expand All @@ -375,7 +392,7 @@ mod tests {
let (mut connection_manager, _, _) = build_manager(connector);
let res = connection_manager.do_start_handling().await;
if let Err(ConnectionManagerError::InitialConnectionError { error }) = res {
assert_eq!(error.to_string(), "First event was empty");
assert_eq!(error.to_string(), FIRST_EVENT_EMPTY);
} else {
unreachable!();
}
Expand Down Expand Up @@ -453,14 +470,14 @@ mod tests {
fn build_manager(
connector: Box<dyn StreamConnector + Send + Sync>,
) -> (
ConnectionManager,
DefaultConnectionManager,
Receiver<SseEvent>,
Receiver<(Filter, u32)>,
) {
let bind_address = Url::parse("http://localhost:123").unwrap();
let (data_tx, data_rx) = channel(100);
let (event_id_tx, event_id_rx) = channel(100);
let manager = ConnectionManager {
let manager = DefaultConnectionManager {
connector,
bind_address,
current_event_id: None,
Expand All @@ -471,4 +488,52 @@ mod tests {
};
(manager, data_rx, event_id_rx)
}

pub struct MockConnectionManager {
sender: Sender<String>,
finish_after: Duration,
to_return: Option<Result<(), ConnectionManagerError>>,
msg: Option<String>,
}

impl MockConnectionManager {
pub fn new(
finish_after: Duration,
to_return: Result<(), ConnectionManagerError>,
sender: Sender<String>,
msg: Option<String>,
) -> Self {
Self {
sender,
finish_after,
to_return: Some(to_return),
msg,
}
}
pub fn fail_fast(sender: Sender<String>) -> Self {
let error = Error::msg("xyz");
let a = Err(ConnectionManagerError::NonRecoverableError { error });
Self::new(Duration::from_millis(1), a, sender, None)
}

pub fn ok_long(sender: Sender<String>, msg: Option<&str>) -> Self {
Self::new(
Duration::from_secs(10),
Ok(()),
sender,
msg.map(|s| s.to_string()),
)
}
}

#[async_trait::async_trait]
impl ConnectionManager for MockConnectionManager {
async fn start_handling(&mut self) -> Result<(), ConnectionManagerError> {
if let Some(msg) = &self.msg {
self.sender.send(msg.clone()).await.unwrap();
}
sleep(self.finish_after).await;
self.to_return.take().unwrap() //Unwraping on purpose - this method should only be called once.
}
}
}
2 changes: 1 addition & 1 deletion listener/src/connection_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::Notify;
/// failure to connect to any filter should cause all connections to fail without reading any events
/// from the stream(s).
#[derive(Clone)]
pub(super) struct ConnectionTasks {
pub struct ConnectionTasks {
/// The total number filters to which the [ConnectionManager](super::ConnectionManager) is attempting to connect.
total: usize,
/// The number of filters to which successful connections have been established.
Expand Down
Loading

0 comments on commit f87bfda

Please sign in to comment.