Skip to content

Commit

Permalink
Bringing back /main /deployss /sigs events to sidecars SSE server. Fo…
Browse files Browse the repository at this point in the history
…r now it provides 2.x SSE events.
  • Loading branch information
Jakub Zajkowski committed Apr 19, 2024
1 parent c7988fa commit 81ea06d
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 45 deletions.
8 changes: 6 additions & 2 deletions event_sidecar/src/event_stream_server/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
#[cfg(test)]
use std::fmt::{Display, Formatter};

/// Enum representing all possible endpoints sidecar can have.
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub enum Endpoint {
Events,
Main,
Deploys,
Sigs,
Sidecar,
}

#[cfg(test)]
impl Display for Endpoint {
/// This implementation is for test only and created to mimick how Display is implemented for Filter.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Endpoint::Events => write!(f, "events"),
Endpoint::Main => write!(f, "events/main"),
Endpoint::Deploys => write!(f, "events/deploys"),
Endpoint::Sigs => write!(f, "events/sigs"),
Endpoint::Sidecar => write!(f, "events/sidecar"),
}
}
Expand Down
136 changes: 125 additions & 11 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ use warp::{

/// The URL root path.
pub const SSE_API_ROOT_PATH: &str = "events";

/// The URL path part to subscribe to "backwards compatible" 'main' event stream.
/// It will check for events from the nodes firehose and those which can be translated to 1.x format will be translated.
pub const SSE_API_MAIN_PATH: &str = "main";
/// The URL path part to subscribe to only `DeployAccepted` events.
pub const SSE_API_DEPLOYS_PATH: &str = "deploys";
/// The URL path part to subscribe to only `FinalitySignature` events.
pub const SSE_API_SIGNATURES_PATH: &str = "sigs";

/// The URL path part to subscribe to all events other than `TransactionAccepted`s and
/// `FinalitySignature`s.
/// The URL path part to subscribe to sidecar specific events.
Expand All @@ -53,6 +62,21 @@ const EVENTS_FILTER: [EventFilter; 8] = [
EventFilter::FinalitySignature,
EventFilter::Step,
];
/// The filter associated with `/events/main` path.
const MAIN_FILTER: [EventFilter; 6] = [
EventFilter::ApiVersion,
EventFilter::BlockAdded,
EventFilter::TransactionProcessed,
EventFilter::TransactionExpired,
EventFilter::Fault,
EventFilter::Step,
];
/// The filter associated with `/events/deploys` path.
const DEPLOYS_FILTER: [EventFilter; 2] =
[EventFilter::ApiVersion, EventFilter::TransactionAccepted];
/// The filter associated with `/events/sigs` path.
const SIGNATURES_FILTER: [EventFilter; 2] =
[EventFilter::ApiVersion, EventFilter::FinalitySignature];
/// The filter associated with `/events/sidecar` path.
const SIDECAR_FILTER: [EventFilter; 1] = [EventFilter::SidecarVersion];
/// The "id" field of the events sent on the event stream to clients.
Expand Down Expand Up @@ -254,6 +278,9 @@ fn build_event_for_outbound(
pub(super) fn path_to_filter(path_param: &str) -> Option<&'static Endpoint> {
match path_param {
SSE_API_ROOT_PATH => Some(&Endpoint::Events),
SSE_API_MAIN_PATH => Some(&Endpoint::Main),
SSE_API_DEPLOYS_PATH => Some(&Endpoint::Deploys),
SSE_API_SIGNATURES_PATH => Some(&Endpoint::Sigs),
SSE_API_SIDECAR_PATH => Some(&Endpoint::Sidecar),
_ => None,
}
Expand All @@ -262,6 +289,9 @@ pub(super) fn path_to_filter(path_param: &str) -> Option<&'static Endpoint> {
pub(super) fn get_filter(path_param: &str) -> Option<&'static [EventFilter]> {
match path_param {
SSE_API_ROOT_PATH => Some(&EVENTS_FILTER[..]),
SSE_API_MAIN_PATH => Some(&MAIN_FILTER[..]),
SSE_API_DEPLOYS_PATH => Some(&DEPLOYS_FILTER[..]),
SSE_API_SIGNATURES_PATH => Some(&SIGNATURES_FILTER[..]),
SSE_API_SIDECAR_PATH => Some(&SIDECAR_FILTER[..]),
_ => None,
}
Expand Down Expand Up @@ -292,8 +322,11 @@ fn parse_query(query: HashMap<String, String>) -> Result<Option<Id>, Response> {
/// Creates a 404 response with a useful error message in the body.
fn create_404() -> Response {
let mut response = Response::new(Body::from(format!(
"invalid path: expected '/{root}' or '/{root}/{sidecar}'\n",
"invalid path: expected '/{root}/{main}', '/{root}/{deploys}' or '/{root}/{sigs} or '/{root}/{sidecar}'\n",
root = SSE_API_ROOT_PATH,
main = SSE_API_MAIN_PATH,
deploys = SSE_API_DEPLOYS_PATH,
sigs = SSE_API_SIGNATURES_PATH,
sidecar = SSE_API_SIDECAR_PATH,
)));
*response.status_mut() = StatusCode::NOT_FOUND;
Expand Down Expand Up @@ -700,6 +733,44 @@ mod tests {
should_filter_out(&finality_signature, &SIDECAR_FILTER[..]).await;
should_not_filter_out(&shutdown, &SIDECAR_FILTER).await;
should_not_filter_out(&sidecar_api_version, &SIDECAR_FILTER[..]).await;

// `EventFilter::Main` should only filter out `DeployAccepted`s and `FinalitySignature`s.
should_not_filter_out(&api_version, &MAIN_FILTER[..]).await;
should_not_filter_out(&block_added, &MAIN_FILTER[..]).await;
should_not_filter_out(&transaction_processed, &MAIN_FILTER[..]).await;
should_not_filter_out(&transaction_expired, &MAIN_FILTER[..]).await;
should_not_filter_out(&fault, &MAIN_FILTER[..]).await;
should_not_filter_out(&step, &MAIN_FILTER[..]).await;
should_not_filter_out(&shutdown, &MAIN_FILTER).await;

should_filter_out(&transaction_accepted, &MAIN_FILTER[..]).await;
should_filter_out(&finality_signature, &MAIN_FILTER[..]).await;

// `EventFilter::DeployAccepted` should filter out everything except `ApiVersion`s and
// `DeployAccepted`s.
should_not_filter_out(&api_version, &DEPLOYS_FILTER[..]).await;
should_not_filter_out(&transaction_accepted, &DEPLOYS_FILTER[..]).await;
should_not_filter_out(&shutdown, &DEPLOYS_FILTER[..]).await;

should_filter_out(&block_added, &DEPLOYS_FILTER[..]).await;
should_filter_out(&transaction_processed, &DEPLOYS_FILTER[..]).await;
should_filter_out(&transaction_expired, &DEPLOYS_FILTER[..]).await;
should_filter_out(&fault, &DEPLOYS_FILTER[..]).await;
should_filter_out(&finality_signature, &DEPLOYS_FILTER[..]).await;
should_filter_out(&step, &DEPLOYS_FILTER[..]).await;

// `EventFilter::Signatures` should filter out everything except `ApiVersion`s and
// `FinalitySignature`s.
should_not_filter_out(&api_version, &SIGNATURES_FILTER[..]).await;
should_not_filter_out(&finality_signature, &SIGNATURES_FILTER[..]).await;
should_not_filter_out(&shutdown, &SIGNATURES_FILTER[..]).await;

should_filter_out(&block_added, &SIGNATURES_FILTER[..]).await;
should_filter_out(&transaction_accepted, &SIGNATURES_FILTER[..]).await;
should_filter_out(&transaction_processed, &SIGNATURES_FILTER[..]).await;
should_filter_out(&transaction_expired, &SIGNATURES_FILTER[..]).await;
should_filter_out(&fault, &SIGNATURES_FILTER[..]).await;
should_filter_out(&step, &SIGNATURES_FILTER[..]).await;
}

/// This test checks that events with incorrect IDs (i.e. no types have an ID except for
Expand Down Expand Up @@ -767,7 +838,13 @@ mod tests {
inbound_filter: None,
};

for filter in &[&EVENTS_FILTER[..], &SIDECAR_FILTER[..]] {
for filter in &[
&EVENTS_FILTER[..],
&SIDECAR_FILTER[..],
&MAIN_FILTER[..],
&DEPLOYS_FILTER[..],
&SIGNATURES_FILTER[..],
] {
should_filter_out(&malformed_api_version, filter).await;
should_filter_out(&malformed_block_added, filter).await;
should_filter_out(&malformed_transaction_accepted, filter).await;
Expand All @@ -781,7 +858,7 @@ mod tests {
}

#[allow(clippy::too_many_lines)]
async fn should_filter_duplicate_events() {
async fn should_filter_duplicate_events(path_filter: &str) {
let mut rng = TestRng::new();

let mut transactions = HashMap::new();
Expand All @@ -792,6 +869,7 @@ mod tests {
&mut rng,
0,
NUM_INITIAL_EVENTS,
path_filter,
&mut transactions,
))
.collect();
Expand All @@ -804,6 +882,7 @@ mod tests {
&mut rng,
*duplicate_count,
&initial_events,
path_filter,
&mut transactions,
);

Expand All @@ -824,15 +903,15 @@ mod tests {
drop(initial_events_sender);
drop(ongoing_events_sender);

let stream_filter = path_to_filter(SSE_API_ROOT_PATH).unwrap();
let stream_filter = path_to_filter(path_filter).unwrap();
#[cfg(feature = "additional-metrics")]
let (tx, rx) = channel(1000);
// Collect the events emitted by `stream_to_client()` - should not contain duplicates.
let received_events: Vec<Result<WarpServerSentEvent, RecvError>> = stream_to_client(
initial_events_receiver,
ongoing_events_receiver,
stream_filter,
get_filter(SSE_API_ROOT_PATH).unwrap(),
get_filter(path_filter).unwrap(),
#[cfg(feature = "additional-metrics")]
tx,
)
Expand Down Expand Up @@ -881,11 +960,28 @@ mod tests {
}
}

#[tokio::test]
async fn should_filter_duplicate_main_events() {
should_filter_duplicate_events(SSE_API_MAIN_PATH).await
}
/// This test checks that deploy-accepted events from the initial stream which are duplicated in
/// the ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_deploys_events() {
should_filter_duplicate_events(SSE_API_DEPLOYS_PATH).await
}
/// This test checks that signature events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_signature_events() {
should_filter_duplicate_events(SSE_API_SIGNATURES_PATH).await
}

/// This test checks that main events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_firehose_events() {
should_filter_duplicate_events().await
should_filter_duplicate_events(SSE_API_ROOT_PATH).await
}

// Returns `count` random SSE events. The events will have sequential IDs starting from `start_id`, and if the path filter
Expand All @@ -895,21 +991,37 @@ mod tests {
rng: &mut TestRng,
start_id: Id,
count: usize,
path_filter: &str,
transactions: &mut HashMap<TransactionHash, Transaction>,
) -> Vec<ServerSentEvent> {
(start_id..(start_id + count as u32))
.map(|id| {
let discriminator = id % 3;
let data = match discriminator {
0 => SseData::random_block_added(rng),
1 => {
let data = match path_filter {
SSE_API_MAIN_PATH => SseData::random_block_added(rng),
SSE_API_DEPLOYS_PATH => {
let (event, transaction) = SseData::random_transaction_accepted(rng);
assert!(transactions
.insert(transaction.hash(), transaction)
.is_none());
event
}
2 => SseData::random_finality_signature(rng),
SSE_API_SIGNATURES_PATH => SseData::random_finality_signature(rng),
SSE_API_ROOT_PATH => {
let discriminator = id % 3;
match discriminator {
0 => SseData::random_block_added(rng),
1 => {
let (event, transaction) =
SseData::random_transaction_accepted(rng);
assert!(transactions
.insert(transaction.hash(), transaction)
.is_none());
event
}
2 => SseData::random_finality_signature(rng),
_ => unreachable!(),
}
}
_ => unreachable!(),
};
ServerSentEvent {
Expand All @@ -929,6 +1041,7 @@ mod tests {
rng: &mut TestRng,
duplicate_count: usize,
initial_events: &[ServerSentEvent],
path_filter: &str,
transactions: &mut HashMap<TransactionHash, Transaction>,
) -> Vec<ServerSentEvent> {
assert!(duplicate_count < initial_events.len());
Expand All @@ -943,6 +1056,7 @@ mod tests {
rng,
unique_start_id,
unique_count,
path_filter,
transactions,
))
.collect()
Expand Down
Loading

0 comments on commit 81ea06d

Please sign in to comment.