Skip to content

Commit

Permalink
Merge branch 'feat-2.0' into 805_update_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
ipopescu committed May 16, 2024
2 parents a262515 + 0cba5c9 commit 038d874
Show file tree
Hide file tree
Showing 32 changed files with 2,342 additions and 502 deletions.
266 changes: 132 additions & 134 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ members = [
anyhow = "1"
async-stream = "0.3.4"
async-trait = "0.1.77"
casper-types = { git = "https://github.com/casper-network/casper-node", branch = "feat-2.0" }
casper-binary-port = { git = "https://github.com/casper-network/casper-node", branch = "feat-2.0" }
casper-types = { git = "https://github.com/casper-network/casper-node.git", branch = "feat-2.0" }
casper-binary-port = { git = "https://github.com/casper-network/casper-node.git", branch = "feat-2.0" }
casper-event-sidecar = { path = "./event_sidecar", version = "1.0.0" }
casper-event-types = { path = "./types", version = "1.0.0" }
casper-rpc-sidecar = { path = "./rpc_sidecar", version = "1.0.0" }
Expand Down
566 changes: 566 additions & 0 deletions LEGACY_SSE_EMULATION.md

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ emulate_legacy_sse_apis = ["V1"]
```

* `sse_server.enable_server` - If set to true, the SSE server will be enabled.
* `sse_server.emulate_legacy_sse_apis` - A list of legacy Casper node SSE APIs to emulate. The Sidecar will expose SSE endpoints that are compatible with specified versions. Please bear in mind that this feature is an emulation and should be used only for transition periods. In most scenarios, having a 1-to-1 mapping of new messages into old formats is impossible, so this can be a process that loses some data and/or doesn't emit all messages that come from the Casper node. <!--TODO link to new document The details of the emulation are described in the [Event Stream Server SSE legacy emulations](#event-stream-server-sse-legacy-emulations) section.-->
* `sse_server.emulate_legacy_sse_apis` - A list of legacy Casper node SSE APIs to emulate. The Sidecar will expose SSE endpoints that are compatible with specified versions. Please bear in mind that this feature is an emulation and should be used only for transition periods. In most scenarios, having a 1-to-1 mapping of new messages into old formats is impossible, so this can be a process that loses some data and/or doesn't emit all messages that come from the Casper node. See the [Legacy SSE Emulation](./LEGACY_SSE_EMULATION.md) page for more details.

#### Configuring SSE node connections

Expand Down Expand Up @@ -335,9 +335,7 @@ This setting will expose three legacy SSE endpoints with the following events st
* `/events/deploys` - DeployAccepted events
* `/events/main` - All other legacy events, including BlockAdded, DeployProcessed, DeployExpired, Fault, Step, and Shutdown events

<!-- TODO -> fill this in the next PR when mapping is implemented
Those endpoints will emit events in the same format as the V1 SSE API of the Casper node. There are limitations to what the Casper Sidecar can and will do. Here is a list of assumptions:
-->
See the [Legacy SSE Emulation](./LEGACY_SSE_EMULATION.md) page for more details.

#### Configuring the event stream

Expand Down
16 changes: 3 additions & 13 deletions event_sidecar/src/event_stream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ use warp::Filter;
/// that a new client can retrieve the entire set of buffered events if desired.
const ADDITIONAL_PERCENT_FOR_BROADCAST_CHANNEL_SIZE: u32 = 20;

pub type OutboundSender = UnboundedSender<(
Option<EventIndex>,
SseData,
Option<SseFilter>,
Option<String>,
)>;
pub type OutboundSender = UnboundedSender<(Option<EventIndex>, SseData, Option<SseFilter>)>;

#[derive(Debug)]
pub(crate) struct EventStreamServer {
Expand Down Expand Up @@ -115,19 +110,14 @@ impl EventStreamServer {
}

/// Broadcasts the SSE data to all clients connected to the event stream.
pub(crate) fn broadcast(
&mut self,
sse_data: SseData,
inbound_filter: Option<SseFilter>,
maybe_json_data: Option<String>,
) {
pub(crate) fn broadcast(&mut self, sse_data: SseData, inbound_filter: Option<SseFilter>) {
let event_index = match sse_data {
SseData::ApiVersion(..) => None,
_ => Some(self.event_indexer.next_index()),
};
let _ = self
.sse_data_sender
.send((event_index, sse_data, inbound_filter, maybe_json_data));
.send((event_index, sse_data, inbound_filter));
}
}

Expand Down
8 changes: 3 additions & 5 deletions event_sidecar/src/event_stream_server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use tokio::{
};
use tracing::{error, info, trace};
use wheelbuf::WheelBuf;
pub type InboundData = (Option<u32>, SseData, Option<Filter>, Option<String>);
pub type OutboundReceiver =
mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>, Option<String>)>;
pub type InboundData = (Option<u32>, SseData, Option<Filter>);
pub type OutboundReceiver = mpsc::UnboundedReceiver<(Option<EventIndex>, SseData, Option<Filter>)>;
/// Run the HTTP server.
///
/// * `server_with_shutdown` is the actual server as a future which can be gracefully shut down.
Expand Down Expand Up @@ -109,13 +108,12 @@ async fn handle_incoming_data(
broadcaster: &broadcast::Sender<BroadcastChannelMessage>,
) -> Result<(), ()> {
match maybe_data {
Some((maybe_event_index, data, inbound_filter, maybe_json_data)) => {
Some((maybe_event_index, data, inbound_filter)) => {
// Buffer the data and broadcast it to subscribed clients.
trace!("Event stream server received {:?}", data);
let event = ServerSentEvent {
id: maybe_event_index,
data: data.clone(),
json_data: maybe_json_data,
inbound_filter,
};
match data {
Expand Down
89 changes: 55 additions & 34 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
/// Payload of the event
pub(super) data: SseData,
#[allow(dead_code)]
/// TODO remove this field in another PR.
/// Optional raw input for the edge-case scenario in which the output needs to receive exactly the same text as we got from inbound.
pub(super) json_data: Option<String>,
/// Information which endpoint we got the event from
pub(super) inbound_filter: Option<SseFilter>,
}
Expand All @@ -121,15 +117,13 @@ impl ServerSentEvent {
ServerSentEvent {
id: None,
data: SseData::ApiVersion(client_api_version),
json_data: None,
inbound_filter: None,
}
}
pub(super) fn sidecar_version_event(version: ProtocolVersion) -> Self {
ServerSentEvent {
id: None,
data: SseData::SidecarVersion(version),
json_data: None,
inbound_filter: None,
}
}
Expand Down Expand Up @@ -672,64 +666,54 @@ mod tests {
let api_version = ServerSentEvent {
id: None,
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let block_added = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let transaction_accepted = ServerSentEvent {
id: Some(rng.gen()),
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let transaction_processed = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let transaction_expired = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let fault = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let finality_signature = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let step = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let shutdown = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::Shutdown,
json_data: None,
inbound_filter: Some(SseFilter::Events),
};
let sidecar_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_sidecar_version(&mut rng),
json_data: None,
inbound_filter: None,
};

Expand Down Expand Up @@ -801,58 +785,49 @@ mod tests {
let malformed_api_version = ServerSentEvent {
id: Some(rng.gen()),
data: SseData::random_api_version(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_block_added = ServerSentEvent {
id: None,
data: SseData::random_block_added(&mut rng),
json_data: None,
inbound_filter: None,
};
let (sse_data, transaction) = SseData::random_transaction_accepted(&mut rng);
let malformed_transaction_accepted = ServerSentEvent {
id: None,
data: sse_data,
json_data: None,
inbound_filter: None,
};
let mut transactions = HashMap::new();
let _ = transactions.insert(transaction.hash(), transaction);
let malformed_transaction_processed = ServerSentEvent {
id: None,
data: SseData::random_transaction_processed(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_transaction_expired = ServerSentEvent {
id: None,
data: SseData::random_transaction_expired(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_fault = ServerSentEvent {
id: None,
data: SseData::random_fault(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_finality_signature = ServerSentEvent {
id: None,
data: SseData::random_finality_signature(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_step = ServerSentEvent {
id: None,
data: SseData::random_step(&mut rng),
json_data: None,
inbound_filter: None,
};
let malformed_shutdown = ServerSentEvent {
id: None,
data: SseData::Shutdown,
json_data: None,
inbound_filter: None,
};

Expand All @@ -876,7 +851,7 @@ mod tests {
}

#[allow(clippy::too_many_lines)]
async fn should_filter_duplicate_events(path_filter: &str) {
async fn should_filter_duplicate_events(path_filter: &str, is_legacy_endpoint: bool) {
let mut rng = TestRng::new();

let mut transactions = HashMap::new();
Expand Down Expand Up @@ -972,19 +947,46 @@ mod tests {
received_event_str = starts_with_data
.replace_all(received_event_str.as_str(), "")
.into_owned();
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
if is_legacy_endpoint {
let maybe_legacy = LegacySseData::from(&expected_data);
assert!(maybe_legacy.is_some());
let input_legacy = maybe_legacy.unwrap();
let got_legacy =
serde_json::from_str::<LegacySseData>(received_event_str.as_str()).unwrap();
assert_eq!(got_legacy, input_legacy);
} else {
let received_data =
serde_json::from_str::<Value>(received_event_str.as_str()).unwrap();
let expected_data = serde_json::to_value(&expected_data).unwrap();
assert_eq!(expected_data, received_data);
}
}
}
}

#[tokio::test]
async fn should_filter_duplicate_main_events() {
should_filter_duplicate_events(SSE_API_MAIN_PATH, true).await
}
/// This test checks that deploy-accepted events from the initial stream which are duplicated in
/// the ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_deploys_events() {
should_filter_duplicate_events(SSE_API_DEPLOYS_PATH, true).await
}

/// This test checks that signature events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_signature_events() {
should_filter_duplicate_events(SSE_API_SIGNATURES_PATH, true).await
}

/// This test checks that main events from the initial stream which are duplicated in the
/// ongoing stream are filtered out.
#[tokio::test]
async fn should_filter_duplicate_firehose_events() {
should_filter_duplicate_events(SSE_API_ROOT_PATH).await
should_filter_duplicate_events(SSE_API_ROOT_PATH, false).await
}

// Returns `count` random SSE events. The events will have sequential IDs starting from `start_id`, and if the path filter
Expand All @@ -1000,9 +1002,9 @@ mod tests {
(start_id..(start_id + count as u32))
.map(|id| {
let data = match path_filter {
SSE_API_MAIN_PATH => SseData::random_block_added(rng),
SSE_API_MAIN_PATH => make_legacy_compliant_random_block(rng),
SSE_API_DEPLOYS_PATH => {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let (event, transaction) = make_legacy_compliant_random_transaction(rng);
assert!(transactions
.insert(transaction.hash(), transaction)
.is_none());
Expand Down Expand Up @@ -1030,13 +1032,32 @@ mod tests {
ServerSentEvent {
id: Some(id),
data,
json_data: None,
inbound_filter: None,
}
})
.collect()
}

fn make_legacy_compliant_random_transaction(rng: &mut TestRng) -> (SseData, Transaction) {
loop {
let (event, transaction) = SseData::random_transaction_accepted(rng);
let legacy = LegacySseData::from(&event);
if legacy.is_some() {
return (event, transaction);
}
}
}

fn make_legacy_compliant_random_block(rng: &mut TestRng) -> SseData {
loop {
let block = SseData::random_block_added(rng);
let legacy = LegacySseData::from(&block);
if legacy.is_some() {
return block;
}
}
}

// Returns `NUM_ONGOING_EVENTS` random SSE events for the ongoing stream containing
// duplicates taken from the end of the initial stream. Allows for the full initial stream
// to be duplicated except for its first event (the `ApiVersion` one) which has no ID.
Expand Down
Loading

0 comments on commit 038d874

Please sign in to comment.