Skip to content

Commit

Permalink
Mapping for legacy filters (#290)
Browse files Browse the repository at this point in the history
* Implementing mapping of contemporary 2.x sse events to legacy 1.x /events/main /events/deploys and /events/sigs formats

---------

Co-authored-by: Jakub Zajkowski <[email protected]>
  • Loading branch information
zajko and Jakub Zajkowski authored May 7, 2024
1 parent 2f47b5b commit fb41154
Show file tree
Hide file tree
Showing 19 changed files with 1,757 additions and 400 deletions.
313 changes: 153 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ casper-rpc-sidecar = { path = "./rpc_sidecar", version = "1.0.0" }
datasize = "0.2.11"
futures = "0"
futures-util = "0.3.28"
itertools = "0.10.3"
metrics = { path = "./metrics", version = "1.0.0" }
once_cell = "1.18.0"
thiserror = "1"
Expand Down
2 changes: 1 addition & 1 deletion event_sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ hex_fmt = "0.3.0"
http = "0.2.1"
hyper = "0.14.4"
indexmap = "2.0.0"
itertools = "0.10.3"
itertools = { workspace = true }
jsonschema = "0.17.1"
metrics = { workspace = true }
pin-project = "1.1.5"
Expand Down
164 changes: 75 additions & 89 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
use super::endpoint::Endpoint;
#[cfg(feature = "additional-metrics")]
use crate::utils::start_metrics_thread;
use casper_event_types::{sse_data::EventFilter, sse_data::SseData, Filter as SseFilter};
use casper_event_types::{
legacy_sse_data::LegacySseData,
sse_data::{EventFilter, SseData},
Filter as SseFilter,
};
use casper_types::{ProtocolVersion, Transaction};
use futures::{future, Stream, StreamExt};
use http::StatusCode;
use hyper::Body;
use serde::Serialize;
#[cfg(test)]
use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -81,7 +86,13 @@ const SIGNATURES_FILTER: [EventFilter; 2] =
const SIDECAR_FILTER: [EventFilter; 1] = [EventFilter::SidecarVersion];
/// The "id" field of the events sent on the event stream to clients.
pub type Id = u32;
type UrlProps = (&'static [EventFilter], &'static Endpoint, Option<u32>);
pub type IsLegacyFilter = bool;
type UrlProps = (
&'static [EventFilter],
&'static Endpoint,
Option<u32>,
IsLegacyFilter,
);

#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
Expand All @@ -96,6 +107,8 @@ pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
/// Payload of the event
pub(super) data: SseData,
#[allow(dead_code)]
/// TODO remove this field in another PR.
/// Optional raw input for the edge-case scenario in which the output needs to receive exactly the same text as we got from inbound.
pub(super) json_data: Option<String>,
/// Information which endpoint we got the event from
Expand Down Expand Up @@ -136,18 +149,31 @@ pub(super) enum BroadcastChannelMessage {
Shutdown,
}

fn event_to_warp_event(event: &ServerSentEvent) -> warp::sse::Event {
let maybe_value = event
.json_data
.as_ref()
.map(|el| serde_json::from_str::<Value>(el).unwrap());
match &maybe_value {
Some(json_data) => WarpServerSentEvent::default().json_data(json_data),
None => WarpServerSentEvent::default().json_data(&event.data),
}
.unwrap_or_else(|error| {
warn!(%error, ?event, "failed to jsonify sse event");
WarpServerSentEvent::default()
fn event_to_warp_event(
event: &ServerSentEvent,
is_legacy_filter: bool,
maybe_id: Option<String>,
) -> Option<Result<WarpServerSentEvent, RecvError>> {
let warp_data = WarpServerSentEvent::default();
let maybe_event = if is_legacy_filter {
let legacy_data = LegacySseData::from(&event.data);
legacy_data.map(|data| {
warp_data.json_data(&data).unwrap_or_else(|error| {
warn!(%error, ?event, "failed to jsonify sse event");
WarpServerSentEvent::default()
})
})
} else {
Some(warp_data.json_data(&event.data).unwrap_or_else(|error| {
warn!(%error, ?event, "failed to jsonify sse event");
WarpServerSentEvent::default()
}))
};
maybe_event.map(|mut event| {
if let Some(id) = maybe_id {
event = event.id(id);
}
Ok(event)
})
}

Expand All @@ -165,6 +191,7 @@ async fn filter_map_server_sent_event(
event: &ServerSentEvent,
stream_filter: &Endpoint,
event_filter: &[EventFilter],
is_legacy_filter: bool,
) -> Option<Result<WarpServerSentEvent, RecvError>> {
if !event.data.should_include(event_filter) {
return None;
Expand All @@ -176,21 +203,15 @@ async fn filter_map_server_sent_event(

match &event.data {
&SseData::ApiVersion { .. } | &SseData::SidecarVersion { .. } => {
let warp_event = event_to_warp_event(event);
Some(Ok(warp_event))
event_to_warp_event(event, is_legacy_filter, None)
}
&SseData::BlockAdded { .. }
| &SseData::TransactionProcessed { .. }
| &SseData::TransactionExpired { .. }
| &SseData::Fault { .. }
| &SseData::Step { .. }
| &SseData::FinalitySignature(_) => {
let warp_event = event_to_warp_event(event).id(id);
Some(Ok(warp_event))
}
SseData::TransactionAccepted(transaction) => {
handle_transaction_accepted(event, transaction, &id)
}
| &SseData::TransactionAccepted(..)
| &SseData::FinalitySignature(_) => event_to_warp_event(event, is_legacy_filter, Some(id)),
&SseData::Shutdown => {
if should_send_shutdown(event, stream_filter) {
build_event_for_outbound(event, id)
Expand All @@ -209,32 +230,6 @@ fn should_send_shutdown(event: &ServerSentEvent, stream_filter: &Endpoint) -> bo
}
}

fn handle_transaction_accepted(
event: &ServerSentEvent,
transaction: &Arc<Transaction>,
id: &String,
) -> Option<Result<WarpServerSentEvent, RecvError>> {
let maybe_value = event
.json_data
.as_ref()
.map(|el| serde_json::from_str::<Value>(el).unwrap());
let warp_event = match maybe_value {
Some(json_data) => WarpServerSentEvent::default().json_data(json_data),
None => {
let transaction_accepted = &TransactionAccepted {
transaction_accepted: transaction.clone(),
};
WarpServerSentEvent::default().json_data(transaction_accepted)
}
}
.unwrap_or_else(|error| {
warn!(%error, ?event, "failed to jsonify sse event");
WarpServerSentEvent::default()
})
.id(id);
Some(Ok(warp_event))
}

fn determine_id(event: &ServerSentEvent) -> Option<String> {
match event.id {
Some(id) => {
Expand All @@ -261,13 +256,9 @@ fn build_event_for_outbound(
event: &ServerSentEvent,
id: String,
) -> Option<Result<WarpServerSentEvent, RecvError>> {
let maybe_value = event
.json_data
.as_ref()
.map(|el| serde_json::from_str::<Value>(el).unwrap())
.unwrap_or_else(|| serde_json::to_value(&event.data).unwrap());
let json_value = serde_json::to_value(&event.data).unwrap();
Some(Ok(WarpServerSentEvent::default()
.json_data(&maybe_value)
.json_data(&json_value)
.unwrap_or_else(|error| {
warn!(%error, ?event, "failed to jsonify sse event");
WarpServerSentEvent::default()
Expand All @@ -292,13 +283,13 @@ pub(super) fn path_to_filter(
pub(super) fn get_filter(
path_param: &str,
enable_legacy_filters: bool,
) -> Option<&'static [EventFilter]> {
) -> Option<(&'static [EventFilter], bool)> {
match path_param {
SSE_API_ROOT_PATH => Some(&EVENTS_FILTER[..]),
SSE_API_MAIN_PATH if enable_legacy_filters => Some(&MAIN_FILTER[..]),
SSE_API_DEPLOYS_PATH if enable_legacy_filters => Some(&DEPLOYS_FILTER[..]),
SSE_API_SIGNATURES_PATH if enable_legacy_filters => Some(&SIGNATURES_FILTER[..]),
SSE_API_SIDECAR_PATH => Some(&SIDECAR_FILTER[..]),
SSE_API_ROOT_PATH => Some((&EVENTS_FILTER[..], false)),
SSE_API_MAIN_PATH if enable_legacy_filters => Some((&MAIN_FILTER[..], true)),
SSE_API_DEPLOYS_PATH if enable_legacy_filters => Some((&DEPLOYS_FILTER[..], true)),
SSE_API_SIGNATURES_PATH if enable_legacy_filters => Some((&SIGNATURES_FILTER[..], true)),
SSE_API_SIDECAR_PATH => Some((&SIDECAR_FILTER[..], false)),
_ => None,
}
}
Expand Down Expand Up @@ -386,7 +377,7 @@ fn serve_sse_response_handler(
if let Some(value) = validate(&cloned_broadcaster, max_concurrent_subscribers) {
return value;
}
let (event_filter, stream_filter, start_from) =
let (event_filter, stream_filter, start_from, is_legacy_filter) =
match parse_url_props(maybe_path_param, query, enable_legacy_filters) {
Ok(value) => value,
Err(error_response) => return error_response,
Expand Down Expand Up @@ -416,6 +407,7 @@ fn serve_sse_response_handler(
ongoing_events_receiver,
stream_filter,
event_filter,
is_legacy_filter,
#[cfg(feature = "additional-metrics")]
metrics_sender,
)))
Expand All @@ -428,10 +420,11 @@ fn parse_url_props(
enable_legacy_filters: bool,
) -> Result<UrlProps, http::Response<Body>> {
let path_param = maybe_path_param.unwrap_or_else(|| SSE_API_ROOT_PATH.to_string());
let event_filter = match get_filter(path_param.as_str(), enable_legacy_filters) {
Some(filter) => filter,
None => return Err(create_404(enable_legacy_filters)),
};
let (event_filter, is_legacy_filter) =
match get_filter(path_param.as_str(), enable_legacy_filters) {
Some((filter, is_legacy_filter)) => (filter, is_legacy_filter),
None => return Err(create_404(enable_legacy_filters)),
};
let stream_filter = match path_to_filter(path_param.as_str(), enable_legacy_filters) {
Some(filter) => filter,
None => return Err(create_404(enable_legacy_filters)),
Expand All @@ -440,7 +433,7 @@ fn parse_url_props(
Ok(maybe_id) => maybe_id,
Err(error_response) => return Err(error_response),
};
Ok((event_filter, stream_filter, start_from))
Ok((event_filter, stream_filter, start_from, is_legacy_filter))
}

fn validate(
Expand Down Expand Up @@ -530,6 +523,7 @@ fn stream_to_client(
ongoing_events: broadcast::Receiver<BroadcastChannelMessage>,
stream_filter: &'static Endpoint,
event_filter: &'static [EventFilter],
is_legacy_filter: bool,
#[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>,
) -> impl Stream<Item = Result<WarpServerSentEvent, RecvError>> + 'static {
// Keep a record of the IDs of the events delivered via the `initial_events` receiver.
Expand Down Expand Up @@ -559,6 +553,7 @@ fn stream_to_client(
ongoing_stream,
stream_filter,
event_filter,
is_legacy_filter,
)
}

Expand All @@ -571,6 +566,7 @@ fn build_combined_events_stream(
>,
stream_filter: &'static Endpoint,
event_filter: &'static [EventFilter],
is_legacy_filter: bool,
) -> impl Stream<Item = Result<WarpServerSentEvent, RecvError>> + 'static {
UnboundedReceiverStream::new(initial_events)
.map(move |event| {
Expand All @@ -588,8 +584,13 @@ fn build_combined_events_stream(
let sender = metrics_sender;
match result {
Ok(event) => {
let fitlered_data =
filter_map_server_sent_event(&event, stream_filter, event_filter).await;
let fitlered_data = filter_map_server_sent_event(
&event,
stream_filter,
event_filter,
is_legacy_filter,
)
.await;
#[cfg(feature = "additional-metrics")]
if let Some(_) = fitlered_data {
let _ = sender.clone().send(()).await;
Expand Down Expand Up @@ -641,7 +642,7 @@ mod tests {

async fn should_filter_out(event: &ServerSentEvent, filter: &'static [EventFilter]) {
assert!(
filter_map_server_sent_event(event, &Endpoint::Events, filter)
filter_map_server_sent_event(event, &Endpoint::Events, filter, false)
.await
.is_none(),
"should filter out {:?} with {:?}",
Expand All @@ -652,7 +653,7 @@ mod tests {

async fn should_not_filter_out(event: &ServerSentEvent, filter: &'static [EventFilter]) {
assert!(
filter_map_server_sent_event(event, &Endpoint::Events, filter)
filter_map_server_sent_event(event, &Endpoint::Events, filter, false)
.await
.is_some(),
"should not filter out {:?} with {:?}",
Expand Down Expand Up @@ -923,12 +924,14 @@ mod tests {
let stream_filter = path_to_filter(path_filter, true).unwrap();
#[cfg(feature = "additional-metrics")]
let (tx, rx) = channel(1000);
let (filter, is_legacy_filter) = get_filter(path_filter, true).unwrap();
// Collect the events emitted by `stream_to_client()` - should not contain duplicates.
let received_events: Vec<Result<WarpServerSentEvent, RecvError>> = stream_to_client(
initial_events_receiver,
ongoing_events_receiver,
stream_filter,
get_filter(path_filter, true).unwrap(),
filter,
is_legacy_filter,
#[cfg(feature = "additional-metrics")]
tx,
)
Expand Down Expand Up @@ -977,23 +980,6 @@ mod tests {
}
}

#[tokio::test]
async fn should_filter_duplicate_main_events() {
should_filter_duplicate_events(SSE_API_MAIN_PATH).await
}
/// This test checks that deploy-accepted events from the initial stream which are duplicated in
/// the ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_deploys_events() {
should_filter_duplicate_events(SSE_API_DEPLOYS_PATH).await
}
/// This test checks that signature events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_signature_events() {
should_filter_duplicate_events(SSE_API_SIGNATURES_PATH).await
}

/// This test checks that main events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
Expand Down
Loading

0 comments on commit fb41154

Please sign in to comment.