Skip to content

Commit

Permalink
ConnectionManager uses "?start_from=0" on every first connection to a…
Browse files Browse the repository at this point in the history
…nd endpoint so we have the possibility to catch up in case of sidecar restart. Previously we would use start_from=0 only if it was a cold start (empty database) (#211)
  • Loading branch information
zajko authored Oct 26, 2023
1 parent 906022e commit 1e303b9
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 38 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ delay_between_retries_in_seconds = 5
allow_partial_connection = false
enable_logging = false
connection_timeout_in_seconds = 3
no_message_timeout_in_seconds = 60
sleep_between_keep_alive_checks_in_seconds = 30
```

The `node_connections` option configures the node (or multiple nodes) to which the Sidecar will connect and the parameters under which it will operate with that node.
Expand All @@ -72,6 +74,8 @@ The `node_connections` option configures the node (or multiple nodes) to which t
* `allow_partial_connection` - Determining whether the Sidecar will allow a partial connection to this node.
* `enable_logging` - This enables the logging of events from the node in question.
* `connection_timeout_in_seconds` - The total time before the connection request times out.
* `no_message_timeout_in_seconds` - Optional parameter that determines after what time of not receiving any bytes from the connection will it be restarted. Defaults to 120
* `sleep_between_keep_alive_checks_in_seconds` - Optional parameter which determines in what intervals will the liveliness of the connection be checked. Defaults to 60

### Storage

Expand Down Expand Up @@ -198,15 +202,19 @@ This information determines configuration for the Sidecar's `admin_server`. It i
* `max_concurrent_requests` - The maximum total number of simultaneous requests that can be made to the REST server.
* `max_requests_per_second` - The maximum total number of requests that can be made per second.

## Swagger documentation

Once Sidecar is running, you can access the Swagger documentation at `http://localhost:18888/swagger-ui/`. You will need to replace `localhost` with the IP address of the machine running the Sidecar application if you are running the Sidecar remotely. The Swagger documentation will allow you to test the REST API.

## Unit Testing the Sidecar Application

You can run included unit tests with the following command:
You can run included unit and integration tests with the following command:

```
cargo test
```

You can also run the integration and performance tests using the following command:
You can also run the performance tests using the following command:

```
cargo test -- --include-ignored
Expand Down
10 changes: 2 additions & 8 deletions listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ impl EventListener {
}

/// Spins up the connections and starts pushing data from node
///
/// * `is_empty_database` - if set to true, sidecar will connect to the node and fetch all the events the node has in it's cache.
pub async fn stream_aggregated_events(&mut self, is_empty_database: bool) -> Result<(), Error> {
pub async fn stream_aggregated_events(&mut self) -> Result<(), Error> {
EventListenerStatus::Preparing.log_status_for_event_listener(self);
let (last_event_id_for_filter, last_seen_event_id_sender) =
self.start_last_event_id_registry(self.node.ip_address.to_string(), self.node.sse_port);
Expand All @@ -223,7 +221,6 @@ impl EventListener {
match self
.do_connect(
last_event_id_for_filter.clone(),
is_empty_database,
last_seen_event_id_sender.clone(),
)
.await?
Expand All @@ -240,13 +237,11 @@ impl EventListener {
async fn do_connect(
&mut self,
last_event_id_for_filter: Arc<Mutex<HashMap<Filter, u32>>>,
is_empty_database: bool,
last_seen_event_id_sender: FilterWithEventId,
) -> Result<ConnectOutcome, Error> {
let connections = self
.build_connections(
last_event_id_for_filter.clone(),
is_empty_database,
last_seen_event_id_sender.clone(),
)
.await?;
Expand Down Expand Up @@ -324,7 +319,6 @@ impl EventListener {
async fn build_connections(
&mut self,
last_event_id_for_filter: Arc<Mutex<HashMap<Filter, u32>>>,
is_empty_database: bool,
last_seen_event_id_sender: FilterWithEventId,
) -> Result<HashMap<Filter, ConnectionManager>, Error> {
let filters = filters_from_version(self.node_build_version);
Expand All @@ -334,7 +328,7 @@ impl EventListener {
let guard = last_event_id_for_filter.lock().await;
for filter in filters {
let mut start_from_event_id = guard.get(&filter).copied();
if is_empty_database && start_from_event_id.is_none() {
if start_from_event_id.is_none() {
start_from_event_id = Some(0);
}
let connection = self
Expand Down
6 changes: 3 additions & 3 deletions sidecar/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ async fn sidecar_should_use_start_from_if_database_is_empty() {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sidecar_should_not_use_start_from_if_database_is_not_empty() {
async fn sidecar_should_use_start_from_if_database_is_not_empty() {
let mut rng = TestRng::new();
let (
testing_config,
Expand Down Expand Up @@ -481,10 +481,10 @@ async fn sidecar_should_not_use_start_from_if_database_is_not_empty() {
stop_nodes_and_wait(vec![&mut node_mock]).await;

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 2);
//Should not have data from node cache
assert_eq!(events_received.len(), 3);
assert!(events_received.get(0).unwrap().contains("\"1.5.2\""));
assert!(events_received.get(1).unwrap().contains("\"BlockAdded\""));
assert!(events_received.get(2).unwrap().contains("\"BlockAdded\""));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
Expand Down
15 changes: 1 addition & 14 deletions sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ async fn run(config: Config) -> Result<(), Error> {
let storage_config = config.storage.clone();
let database = build_database(&storage_config).await?;
let rest_server_handle = build_and_start_rest_server(&config, database.clone());
let is_empty_database = check_if_database_is_empty(database.clone()).await?;

// Task to manage incoming events from all three filters
let listening_task_handle = start_sse_processors(
Expand All @@ -115,7 +114,6 @@ async fn run(config: Config) -> Result<(), Error> {
sse_data_receivers,
database.clone(),
outbound_sse_data_sender.clone(),
is_empty_database,
);

let event_broadcasting_handle =
Expand Down Expand Up @@ -165,7 +163,6 @@ fn start_sse_processors(
sse_data_receivers: Vec<Receiver<SseEvent>>,
database: Database,
outbound_sse_data_sender: Sender<(SseData, Option<Filter>, Option<String>)>,
is_empty_database: bool,
) -> JoinHandle<Result<(), Error>> {
tokio::spawn(async move {
let mut join_handles = Vec::with_capacity(event_listeners.len());
Expand All @@ -177,9 +174,7 @@ fn start_sse_processors(
.zip(sse_data_receivers)
{
tokio::spawn(async move {
let res = event_listener
.stream_aggregated_events(is_empty_database)
.await;
let res = event_listener.stream_aggregated_events().await;
if let Err(e) = res {
let addr = event_listener.get_node_interface().ip_address.to_string();
error!("Disconnected from {}. Reason: {}", addr, e.to_string());
Expand Down Expand Up @@ -832,14 +827,6 @@ async fn start_single_threaded_events_consumer<
}
}

async fn check_if_database_is_empty(database: Database) -> Result<bool, Error> {
database
.get_number_of_events()
.await
.map(|i| i == 0)
.map_err(|e| Error::msg(format!("Error when checking if database is empty {:?}", e)))
}

fn count_error(reason: &str) {
metrics::ERROR_COUNTS
.with_label_values(&["main", reason])
Expand Down
4 changes: 2 additions & 2 deletions sidecar/src/performance_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
.build();

tokio::spawn(async move {
let res = node_event_listener.stream_aggregated_events(false).await;
let res = node_event_listener.stream_aggregated_events().await;
if let Err(error) = res {
println!("Node listener Error: {}", error)
}
Expand All @@ -275,7 +275,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
}
.build();
tokio::spawn(async move {
let res = sidecar_event_listener.stream_aggregated_events(false).await;
let res = sidecar_event_listener.stream_aggregated_events().await;
if let Err(error) = res {
println!("Sidecar listener Error: {}", error)
}
Expand Down
9 changes: 0 additions & 9 deletions sidecar/src/types/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ pub enum Database {
PostgreSqlDatabaseWrapper(PostgreSqlDatabase),
}

impl Database {
pub async fn get_number_of_events(&self) -> Result<u64, DatabaseReadError> {
match self {
Database::SqliteDatabaseWrapper(db) => db.get_number_of_events().await,
Database::PostgreSqlDatabaseWrapper(db) => db.get_number_of_events().await,
}
}
}

/// Describes a reference for the writing interface of an 'Event Store' database.
/// There is a one-to-one relationship between each method and each event that can be received from the node.
/// Each method takes the `data` and `id` fields as well as the source IP address (useful for tying the node-specific `id` to the relevant node).
Expand Down

0 comments on commit 1e303b9

Please sign in to comment.