Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added documentation for legacy translation. Fixed removed tests. Adde… #301

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 98 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

566 changes: 566 additions & 0 deletions LEGACY_SSE_EMULATION.md

Large diffs are not rendered by default.

16 changes: 1 addition & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,22 +273,8 @@ sleep_between_keep_alive_checks_in_seconds = 30

#### Event Stream Server SSE legacy emulations

Currently the only possible emulation is the V1 SSE API. Enabling V1 SSE api emulation requires setting `emulate_legacy_sse_apis` to `["V1"]`, like:
```
[sse_server]
(...)
emulate_legacy_sse_apis = ["V1"]
(...)
```

This will expose three additional sse endpoints:
* `/events/sigs`
* `/events/deploys`
* `/events/main`

Those endpoints will emit events in the same format as the V1 SSE API of the casper node. There are limitations to what Casper Sidecar can and will do, here is a list of assumptions:
Please see [Legacy sse emulation file](./LEGACY_SSE_EMULATION.md)

TODO -> fill this in the next PR when mapping is implemented

### Storage

Expand Down
16 changes: 3 additions & 13 deletions event_sidecar/src/event_stream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ use warp::Filter;
/// that a new client can retrieve the entire set of buffered events if desired.
const ADDITIONAL_PERCENT_FOR_BROADCAST_CHANNEL_SIZE: u32 = 20;

pub type OutboundSender = UnboundedSender<(
Option<EventIndex>,
SseData,
Option<SseFilter>,
Option<String>,
)>;
pub type OutboundSender = UnboundedSender<(Option<EventIndex>, SseData, Option<SseFilter>)>;

#[derive(Debug)]
pub(crate) struct EventStreamServer {
Expand Down Expand Up @@ -115,19 +110,14 @@ impl EventStreamServer {
}

/// Broadcasts the SSE data to all clients connected to the event stream.
pub(crate) fn broadcast(
&mut self,
sse_data: SseData,
inbound_filter: Option<SseFilter>,
maybe_json_data: Option<String>,
) {
pub(crate) fn broadcast(&mut self, sse_data: SseData, inbound_filter: Option<SseFilter>) {
let event_index = match sse_data {
SseData::ApiVersion(..) => None,
_ => Some(self.event_indexer.next_index()),
};
let _ = self
.sse_data_sender
.send((event_index, sse_data, inbound_filter, maybe_json_data));
.send((event_index, sse_data, inbound_filter));
}
}

Expand Down
8 changes: 3 additions & 5 deletions event_sidecar/src/event_stream_server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use tokio::{
};
use tracing::{error, info, trace};
use wheelbuf::WheelBuf;
pub type InboundData = (Option<u32>, SseData, Option<Filter>, Option<String>);
pub type OutboundReceiver =
mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>, Option<String>)>;
pub type InboundData = (Option<u32>, SseData, Option<Filter>);
pub type OutboundReceiver = mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>)>;
/// Run the HTTP server.
///
/// * `server_with_shutdown` is the actual server as a future which can be gracefully shut down.
Expand Down Expand Up @@ -109,13 +108,12 @@ async fn handle_incoming_data(
broadcaster: &broadcast::Sender<BroadcastChannelMessage>,
) -> Result<(), ()> {
match maybe_data {
Some((maybe_event_index, data, inbound_filter, maybe_json_data)) => {
Some((maybe_event_index, data, inbound_filter)) => {
// Buffer the data and broadcast it to subscribed clients.
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent {
id: maybe_event_index,
data: data.clone(),
json_data: maybe_json_data,
inbound_filter,
};
match data {
Expand Down
89 changes: 55 additions & 34 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
/// Payload of the event
pub(super) data: SseData,
#[allow(dead_code)]
/// TODO remove this field in another PR.
/// Optional raw input for the edge-case scenario in which the output needs to receive exactly the same text as we got from inbound.
pub(super) json_data: Option<String>,
/// Information which endpoint we got the event from
pub(super) inbound_filter: Option<SseFilter>,
}
Expand All @@ -121,15 +117,13 @@ impl ServerSentEvent {
ServerSentEvent {
id: None,
data: SseData::ApiVersion(client_api_version),
json_data: None,
inbound_filter: None,
}
}
pub(super) fn sidecar_version_event(version: ProtocolVersion) -> Self {
ServerSentEvent {
id: None,
data: SseData::SidecarVersion(version),
json_data: None,
inbound_filter: None,
}
}
Expand Down Expand Up @@ -672,64 +666,54 @@ mod tests {
let api_version = ServerSentEvent {
id: None,
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let block_added = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let transaction_accepted = ServerSentEvent {
id: Some(rng.gen()),
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let transaction_processed = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let transaction_expired = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let fault = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let finality_signature = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let step = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let shutdown = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::Shutdown,
json_data: None,
inbound_filter: Some(SseFilter::Events),
};
let sidecar_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_sidecar_version(&mut rng),
json_data: None,
inbound_filter: None,
};

Expand Down Expand Up @@ -801,58 +785,49 @@ mod tests {
let malformed_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_block_added = ServerSentEvent {
id: None,
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let malformed_transaction_accepted = ServerSentEvent {
id: None,
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let malformed_transaction_processed = ServerSentEvent {
id: None,
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_transaction_expired = ServerSentEvent {
id: None,
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_fault = ServerSentEvent {
id: None,
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_finality_signature = ServerSentEvent {
id: None,
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_step = ServerSentEvent {
id: None,
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_shutdown = ServerSentEvent {
id: None,
data: SseData::Shutdown,
json_data: None,
inbound_filter: None,
};

Expand All @@ -876,7 +851,7 @@ mod tests {
}

#[allow(clippy::too_many_lines)]
async fn should_filter_duplicate_events(path_filter: &str) {
async fn should_filter_duplicate_events(path_filter: &str, is_legacy_endpoint: bool) {
let mut rng = TestRng::new();

let mut transactions = HashMap::new();
Expand Down Expand Up @@ -972,19 +947,46 @@ mod tests {
received_event_str = starts_with_data
.replace_all(received_event_str.as_str(), "")
.into_owned();
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
if is_legacy_endpoint {
let maybe_legacy = LegacySseData::from(&expected_data);
assert!(maybe_legacy.is_some());
let input_legacy = maybe_legacy.unwrap();
let got_legacy =
serde_json::from_str::<LegacySseData>(received_event_str.as_str()).unwrap();
assert_eq!(got_legacy, input_legacy);
} else {
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
}
}
}
}

#[tokio::test]
async fn should_filter_duplicate_main_events() {
should_filter_duplicate_events(SSE_API_MAIN_PATH, true).await
}
/// This test checks that deploy-accepted events from the initial stream which are duplicated in
/// the ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_deploys_events() {
should_filter_duplicate_events(SSE_API_DEPLOYS_PATH, true).await
}

/// This test checks that signature events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_signature_events() {
should_filter_duplicate_events(SSE_API_SIGNATURES_PATH, true).await
}

/// This test checks that main events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_firehose_events() {
should_filter_duplicate_events(SSE_API_ROOT_PATH).await
should_filter_duplicate_events(SSE_API_ROOT_PATH, false).await
}

// Returns `count` random SSE events. The events will have sequential IDs starting from `start_id`, and if the path filter
Expand All @@ -1000,9 +1002,9 @@ mod tests {
(start_id..(start_id + count as u32))
.map(|id| {
let data = match path_filter {
SSE_API_MAIN_PATH => SseData::random_block_added(rng),
SSE_API_MAIN_PATH => make_legacy_compliant_random_block(rng),
SSE_API_DEPLOYS_PATH => {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let (event, transaction) = make_legacy_compliant_random_transaction(rng);
assert!(transactions
.insert(transaction.hash(), transaction)
.is_none());
Expand Down Expand Up @@ -1030,13 +1032,32 @@ mod tests {
ServerSentEvent {
id: Some(id),
data,
json_data: None,
inbound_filter: None,
}
})
.collect()
}

fn make_legacy_compliant_random_transaction(rng: &mut TestRng) -> (SseData, Transaction) {
loop {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let legacy = LegacySseData::from(&event);
if legacy.is_some() {
return (event, transaction);
}
}
}

fn make_legacy_compliant_random_block(rng: &mut TestRng) -> SseData {
loop {
let block = SseData::random_block_added(rng);
let legacy = LegacySseData::from(&block);
if legacy.is_some() {
return block;
}
}
}

// Returns `NUM_ONGOING_EVENTS` random SSE events for the ongoing stream containing
// duplicates taken from the end of the initial stream. Allows for the full initial stream
// to be duplicated except for its first event (the `ApiVersion` one) which has no ID.
Expand Down
Loading
Loading