Skip to content

Commit

Permalink
Added documentation for legacy translation. Fixed removed tests. Adde…
Browse files Browse the repository at this point in the history
…d more unit tests for the 2.x to 1.x translation process
  • Loading branch information
Jakub Zajkowski committed May 8, 2024
1 parent e91a4e0 commit 16aeb1e
Show file tree
Hide file tree
Showing 19 changed files with 1,544 additions and 280 deletions.
197 changes: 98 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

512 changes: 512 additions & 0 deletions LEGACY_SSE_EMULATION.md

Large diffs are not rendered by default.

16 changes: 1 addition & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,22 +273,8 @@ sleep_between_keep_alive_checks_in_seconds = 30

#### Event Stream Server SSE legacy emulations

Currently the only possible emulation is the V1 SSE API. Enabling V1 SSE api emulation requires setting `emulate_legacy_sse_apis` to `["V1"]`, like:
```
[sse_server]
(...)
emulate_legacy_sse_apis = ["V1"]
(...)
```

This will expose three additional sse endpoints:
* `/events/sigs`
* `/events/deploys`
* `/events/main`

Those endpoints will emit events in the same format as the V1 SSE API of the casper node. There are limitations to what Casper Sidecar can and will do, here is a list of assumptions:
Please see [Legacy sse emulation file](./LEGACY_SSE_EMULATION.md)

TODO -> fill this in the next PR when mapping is implemented

### Storage

Expand Down
16 changes: 3 additions & 13 deletions event_sidecar/src/event_stream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ use warp::Filter;
/// that a new client can retrieve the entire set of buffered events if desired.
const ADDITIONAL_PERCENT_FOR_BROADCAST_CHANNEL_SIZE: u32 = 20;

pub type OutboundSender = UnboundedSender<(
Option<EventIndex>,
SseData,
Option<SseFilter>,
Option<String>,
)>;
pub type OutboundSender = UnboundedSender<(Option<EventIndex>, SseData, Option<SseFilter>)>;

#[derive(Debug)]
pub(crate) struct EventStreamServer {
Expand Down Expand Up @@ -115,19 +110,14 @@ impl EventStreamServer {
}

/// Broadcasts the SSE data to all clients connected to the event stream.
pub(crate) fn broadcast(
&mut self,
sse_data: SseData,
inbound_filter: Option<SseFilter>,
maybe_json_data: Option<String>,
) {
pub(crate) fn broadcast(&mut self, sse_data: SseData, inbound_filter: Option<SseFilter>) {
let event_index = match sse_data {
SseData::ApiVersion(..) => None,
_ => Some(self.event_indexer.next_index()),
};
let _ = self
.sse_data_sender
.send((event_index, sse_data, inbound_filter, maybe_json_data));
.send((event_index, sse_data, inbound_filter));
}
}

Expand Down
8 changes: 3 additions & 5 deletions event_sidecar/src/event_stream_server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use tokio::{
};
use tracing::{error, info, trace};
use wheelbuf::WheelBuf;
pub type InboundData = (Option<u32>, SseData, Option<Filter>, Option<String>);
pub type OutboundReceiver =
mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>, Option<String>)>;
pub type InboundData = (Option<u32>, SseData, Option<Filter>);
pub type OutboundReceiver = mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>)>;
/// Run the HTTP server.
///
/// * `server_with_shutdown` is the actual server as a future which can be gracefully shut down.
Expand Down Expand Up @@ -109,13 +108,12 @@ async fn handle_incoming_data(
broadcaster: &broadcast::Sender<BroadcastChannelMessage>,
) -> Result<(), ()> {
match maybe_data {
Some((maybe_event_index, data, inbound_filter, maybe_json_data)) => {
Some((maybe_event_index, data, inbound_filter)) => {
// Buffer the data and broadcast it to subscribed clients.
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent {
id: maybe_event_index,
data: data.clone(),
json_data: maybe_json_data,
inbound_filter,
};
match data {
Expand Down
89 changes: 55 additions & 34 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
/// Payload of the event
pub(super) data: SseData,
#[allow(dead_code)]
/// TODO remove this field in another PR.
/// Optional raw input for the edge-case scenario in which the output needs to receive exactly the same text as we got from inbound.
pub(super) json_data: Option<String>,
/// Information which endpoint we got the event from
pub(super) inbound_filter: Option<SseFilter>,
}
Expand All @@ -121,15 +117,13 @@ impl ServerSentEvent {
ServerSentEvent {
id: None,
data: SseData::ApiVersion(client_api_version),
json_data: None,
inbound_filter: None,
}
}
pub(super) fn sidecar_version_event(version: ProtocolVersion) -> Self {
ServerSentEvent {
id: None,
data: SseData::SidecarVersion(version),
json_data: None,
inbound_filter: None,
}
}
Expand Down Expand Up @@ -672,64 +666,54 @@ mod tests {
let api_version = ServerSentEvent {
id: None,
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let block_added = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let transaction_accepted = ServerSentEvent {
id: Some(rng.gen()),
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let transaction_processed = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let transaction_expired = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let fault = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let finality_signature = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let step = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let shutdown = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::Shutdown,
json_data: None,
inbound_filter: Some(SseFilter::Events),
};
let sidecar_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_sidecar_version(&mut rng),
json_data: None,
inbound_filter: None,
};

Expand Down Expand Up @@ -801,58 +785,49 @@ mod tests {
let malformed_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_block_added = ServerSentEvent {
id: None,
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let malformed_transaction_accepted = ServerSentEvent {
id: None,
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let malformed_transaction_processed = ServerSentEvent {
id: None,
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_transaction_expired = ServerSentEvent {
id: None,
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_fault = ServerSentEvent {
id: None,
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_finality_signature = ServerSentEvent {
id: None,
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_step = ServerSentEvent {
id: None,
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_shutdown = ServerSentEvent {
id: None,
data: SseData::Shutdown,
json_data: None,
inbound_filter: None,
};

Expand All @@ -876,7 +851,7 @@ mod tests {
}

#[allow(clippy::too_many_lines)]
async fn should_filter_duplicate_events(path_filter: &str) {
async fn should_filter_duplicate_events(path_filter: &str, is_legacy_endpoint: bool) {
let mut rng = TestRng::new();

let mut transactions = HashMap::new();
Expand Down Expand Up @@ -972,19 +947,46 @@ mod tests {
received_event_str = starts_with_data
.replace_all(received_event_str.as_str(), "")
.into_owned();
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
if is_legacy_endpoint {
let maybe_legacy = LegacySseData::from(&expected_data);
assert!(maybe_legacy.is_some());
let input_legacy = maybe_legacy.unwrap();
let got_legacy =
serde_json::from_str::<LegacySseData>(received_event_str.as_str()).unwrap();
assert_eq!(got_legacy, input_legacy);
} else {
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
}
}
}
}

#[tokio::test]
async fn should_filter_duplicate_main_events() {
should_filter_duplicate_events(SSE_API_MAIN_PATH, true).await
}
/// This test checks that deploy-accepted events from the initial stream which are duplicated in
/// the ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_deploys_events() {
should_filter_duplicate_events(SSE_API_DEPLOYS_PATH, true).await
}

/// This test checks that signature events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_signature_events() {
should_filter_duplicate_events(SSE_API_SIGNATURES_PATH, true).await
}

/// This test checks that main events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_firehose_events() {
should_filter_duplicate_events(SSE_API_ROOT_PATH).await
should_filter_duplicate_events(SSE_API_ROOT_PATH, false).await
}

// Returns `count` random SSE events. The events will have sequential IDs starting from `start_id`, and if the path filter
Expand All @@ -1000,9 +1002,9 @@ mod tests {
(start_id..(start_id + count as u32))
.map(|id| {
let data = match path_filter {
SSE_API_MAIN_PATH => SseData::random_block_added(rng),
SSE_API_MAIN_PATH => make_legacy_compliant_random_block(rng),
SSE_API_DEPLOYS_PATH => {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let (event, transaction) = make_legacy_compliant_random_transaction(rng);
assert!(transactions
.insert(transaction.hash(), transaction)
.is_none());
Expand Down Expand Up @@ -1030,13 +1032,32 @@ mod tests {
ServerSentEvent {
id: Some(id),
data,
json_data: None,
inbound_filter: None,
}
})
.collect()
}

fn make_legacy_compliant_random_transaction(rng: &mut TestRng) -> (SseData, Transaction) {
loop {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let legacy = LegacySseData::from(&event);
if legacy.is_some() {
return (event, transaction);
}
}
}

fn make_legacy_compliant_random_block(rng: &mut TestRng) -> SseData {
loop {
let block = SseData::random_block_added(rng);
let legacy = LegacySseData::from(&block);
if legacy.is_some() {
return block;
}
}
}

// Returns `NUM_ONGOING_EVENTS` random SSE events for the ongoing stream containing
// duplicates taken from the end of the initial stream. Allows for the full initial stream
// to be duplicated except for its first event (the `ApiVersion` one) which has no ID.
Expand Down
Loading

0 comments on commit 16aeb1e

Please sign in to comment.