diff --git a/event_sidecar/src/event_handling_service.rs b/event_sidecar/src/event_handling_service.rs index 71349878..d9b6d153 100644 --- a/event_sidecar/src/event_handling_service.rs +++ b/event_sidecar/src/event_handling_service.rs @@ -1,25 +1,23 @@ +use crate::{ + types::database::DatabaseWriteError, FinalitySignature, Step, TransactionAccepted, + TransactionProcessed, +}; use async_trait::async_trait; use casper_event_listener::SseEvent; use casper_event_types::{sse_data::SseData, Filter}; -use casper_types::{Block, BlockHash, ProtocolVersion}; -use derive_new::new; -use hex_fmt::HexFmt; -use metrics::{observe_error, sse::observe_contract_messages}; -use tokio::sync::mpsc::{channel as mpsc_channel, Receiver, Sender}; +use casper_types::{ + Block, BlockHash, EraId, ProtocolVersion, PublicKey, Timestamp, TransactionHash, +}; +use metrics::observe_error; +use tokio::sync::mpsc::Sender; use tracing::{debug, trace, warn}; - -use crate::{ - database::{postgresql_database::PostgreSqlDatabase, sqlite_database::SqliteDatabase}, - types::database::{DatabaseReader, DatabaseWriteError, DatabaseWriter}, - BlockAdded, Fault, FinalitySignature, Step, TransactionAccepted, TransactionExpired, - TransactionProcessed, +pub mod db_saving_event_handling_service; +pub mod no_db_event_handling_service; +pub use { + db_saving_event_handling_service::DbSavingEventHandlingService, + no_db_event_handling_service::NoDbEventHandlingService, }; -enum EventHandlingServiceWrapper { - PostgresEventHandlingService(DbSavingEventHandlingService), - SqliteEventHandlingService(DbSavingEventHandlingService), -} - #[async_trait] pub trait EventHandlingService { async fn handle_api_version(&self, version: ProtocolVersion, filter: Filter); @@ -28,369 +26,58 @@ pub trait EventHandlingService { &self, block_hash: BlockHash, block: Box, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + sse_event: SseEvent, ); async fn handle_transaction_accepted( &self, transaction_accepted: TransactionAccepted, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + sse_event: SseEvent, ); async fn handle_transaction_expired( &self, - transaction_accepted: TransactionExpired, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + transaction_hash: TransactionHash, + sse_event: SseEvent, ); async fn handle_transaction_processed( &self, transaction_processed: TransactionProcessed, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + sse_event: SseEvent, ); async fn handle_fault( &self, - fault: Fault, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + era_id: EraId, + timestamp: Timestamp, + public_key: PublicKey, + sse_event: SseEvent, ); - async fn handle_step( - &self, - step: Step, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ); + async fn handle_step(&self, step: Step, sse_event: SseEvent); async fn handle_finality_signature( &self, - fs: FinalitySignature, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, + finality_signature: FinalitySignature, + sse_event: SseEvent, ); async fn handle_shutdown(&self, sse_event: SseEvent); } -#[derive(new, Clone)] -pub struct DbSavingEventHandlingService { - outbound_sse_data_sender: Sender<(SseData, Option)>, - database: Db, -} - -#[async_trait] -impl EventHandlingService for DbSavingEventHandlingService -where - Db: DatabaseReader + DatabaseWriter + Clone + Send + Sync + 'static, -{ - async fn handle_api_version(&self, version: ProtocolVersion, filter: Filter) { - if let Err(error) = self - .outbound_sse_data_sender - .send((SseData::ApiVersion(version), Some(filter))) - .await - { - debug!( - "Error when sending to outbound_sse_data_sender. Error: {}", - error - ); - } - } - - async fn handle_block_added( - &self, - block_hash: BlockHash, - block: Box, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let res = self - .database - .save_block_added( - BlockAdded::new(block_hash, block.clone()), - id, - source, - api_version, - network_name, - ) - .await; - handle_database_save_result( - "BlockAdded", - HexFmt(block_hash.inner()).to_string().as_str(), - res, - &self.outbound_sse_data_sender, - filter, - || SseData::BlockAdded { block, block_hash }, - ) - .await; - } - - async fn handle_transaction_accepted( - &self, - transaction_accepted: TransactionAccepted, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let entity_identifier = transaction_accepted.identifier(); - let transaction = transaction_accepted.transaction(); - let res = self - .database - .save_transaction_accepted(transaction_accepted, id, source, api_version, network_name) - .await; - handle_database_save_result( - "TransactionAccepted", - &entity_identifier, - res, - &self.outbound_sse_data_sender, - filter, - || SseData::TransactionAccepted(transaction), - ) - .await; - } - - async fn handle_transaction_expired( - &self, - transaction_expired: TransactionExpired, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let transaction_hash = transaction_expired.transaction_hash(); - let entity_identifier = transaction_expired.identifier(); - let res = self - .database - .save_transaction_expired( - transaction_expired, - id, - source.to_string(), - api_version, - network_name, - ) - .await; - handle_database_save_result( - "TransactionExpired", - &entity_identifier, - res, - &self.outbound_sse_data_sender, - filter, - || SseData::TransactionExpired { transaction_hash }, - ) - .await; - } - - async fn handle_transaction_processed( - &self, - transaction_processed: TransactionProcessed, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let messages = transaction_processed.messages(); - let entity_identifier = transaction_processed.identifier(); - if !messages.is_empty() { - observe_contract_messages("all", messages.len()); - } - let res = self - .database - .save_transaction_processed( - transaction_processed, - id, - source.to_string(), - api_version, - network_name, - ) - .await; - if res.is_ok() && !messages.is_empty() { - observe_contract_messages("unique", messages.len()); - } - handle_database_save_result( - "TransactionProcessed", - &entity_identifier, - res, - &self.outbound_sse_data_sender, - filter, - || SseData::TransactionProcessed { - transaction_hash, - initiator_addr, - timestamp, - ttl, - block_hash, - execution_result, - messages, - }, - ) - .await; - } - - async fn handle_fault( - &self, - fault: Fault, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let res = self - .database - .save_fault(fault.clone(), id, source, api_version, network_name) - .await; - - handle_database_save_result( - "Fault", - format!("{:#?}", fault).as_str(), - res, - &self.outbound_sse_data_sender, - filter, - || SseData::Fault { - era_id, - timestamp, - public_key, - }, - ) - .await; - } - - async fn handle_step( - &self, - step: Step, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let res = self - .database - .save_step(step, id, source, api_version, network_name) - .await; - handle_database_save_result( - "Step", - format!("{}", step.era_id.value()).as_str(), - res, - &self.outbound_sse_data_sender, - filter, - || SseData::Step { - era_id, - execution_effects, - }, - ) - .await; - } - - async fn handle_finality_signature( - &self, - finality_signature: FinalitySignature, - id: u32, - source: String, - api_version: String, - network_name: String, - filter: Filter, - ) { - let res = self - .database - .save_finality_signature( - finality_signature.clone(), - id, - source.to_string(), - api_version, - network_name, - ) - .await; - handle_database_save_result( - "FinalitySignature", - "", - res, - &self.outbound_sse_data_sender, - filter, - || SseData::FinalitySignature(fs), - ) - .await; - } - - async fn handle_shutdown(&self, sse_event: SseEvent) { - warn!("Node ({}) is unavailable", sse_event.source.to_string()); - let res = self - .database - .save_shutdown( - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - ) - .await; - match res { - Ok(_) | Err(DatabaseWriteError::UniqueConstraint(_)) => { - // We push to outbound on UniqueConstraint error because in sse_server we match shutdowns to outbounds based on the filter they came from to prevent duplicates. - // But that also means that we need to pass through all the Shutdown events so the sse_server can determine to which outbound filters they need to be pushed (we - // don't store in DB the information from which filter did shutdown came). - if let Err(error) = outbound_sse_data_sender - .send((SseData::Shutdown, Some(sse_event.inbound_filter))) - .await - { - debug!( - "Error when sending to outbound_sse_data_sender. Error: {}", - error - ); - } - } - Err(other_err) => { - count_error("db_save_error_shutdown"); - warn!(?other_err, "Unexpected error saving Shutdown") - } - } - } -} - -async fn handle_database_save_result( +async fn handle_database_save_result( entity_name: &str, entity_identifier: &str, - res: Result, + res: Result, outbound_sse_data_sender: &Sender<(SseData, Option)>, inbound_filter: Filter, - build_sse_data: F, -) where - F: FnOnce() -> SseData, -{ + sse_data: SseData, +) { match res { Ok(_) => { if let Err(error) = outbound_sse_data_sender - .send((build_sse_data(), Some(inbound_filter))) + .send((sse_data, Some(inbound_filter))) .await { debug!( diff --git a/event_sidecar/src/event_handling_service/db_saving_event_handling_service.rs b/event_sidecar/src/event_handling_service/db_saving_event_handling_service.rs new file mode 100644 index 00000000..f4bf3888 --- /dev/null +++ b/event_sidecar/src/event_handling_service/db_saving_event_handling_service.rs @@ -0,0 +1,326 @@ +use crate::{ + event_handling_service::count_error, + transaction_hash_to_identifier, + types::database::{DatabaseReader, DatabaseWriteError, DatabaseWriter}, + BlockAdded, Fault, FinalitySignature, Step, TransactionAccepted, TransactionExpired, + TransactionProcessed, +}; +use async_trait::async_trait; +use casper_event_listener::SseEvent; +use casper_event_types::{sse_data::SseData, Filter}; +use casper_types::{ + Block, BlockHash, EraId, ProtocolVersion, PublicKey, Timestamp, TransactionHash, +}; +use derive_new::new; +use hex_fmt::HexFmt; +use metrics::sse::observe_contract_messages; +use tokio::sync::mpsc::Sender; +use tracing::{debug, info, warn}; + +use super::{handle_database_save_result, EventHandlingService}; + +#[derive(new, Clone)] +pub struct DbSavingEventHandlingService { + outbound_sse_data_sender: Sender<(SseData, Option)>, + database: Db, + enable_event_logging: bool, +} + +#[async_trait] +impl EventHandlingService for DbSavingEventHandlingService +where + Db: DatabaseReader + DatabaseWriter + Clone + Send + Sync + 'static, +{ + async fn handle_api_version(&self, version: ProtocolVersion, filter: Filter) { + if let Err(error) = self + .outbound_sse_data_sender + .send((SseData::ApiVersion(version), Some(filter))) + .await + { + debug!( + "Error when sending to outbound_sse_data_sender. Error: {}", + error + ); + } + if self.enable_event_logging { + info!(%version, "API Version"); + } + } + + async fn handle_block_added( + &self, + block_hash: BlockHash, + block: Box, + sse_event: SseEvent, + ) { + if self.enable_event_logging { + let hex_block_hash = HexFmt(block_hash.inner()); + info!("Block Added: {:18}", hex_block_hash); + debug!("Block Added: {}", hex_block_hash); + } + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let res = self + .database + .save_block_added( + BlockAdded::new(block_hash, block), //TODO maybe we could avoid these clones + id, + source, + api_version, + network_name, + ) + .await; + handle_database_save_result( + "BlockAdded", + HexFmt(block_hash.inner()).to_string().as_str(), + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_accepted( + &self, + transaction_accepted: TransactionAccepted, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_accepted.identifier(); + if self.enable_event_logging { + info!("Transaction Accepted: {:18}", entity_identifier); + debug!("Transaction Accepted: {}", entity_identifier); + } + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let res = self + .database + .save_transaction_accepted(transaction_accepted, id, source, api_version, network_name) + .await; + handle_database_save_result( + "TransactionAccepted", + &entity_identifier, + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_expired( + &self, + transaction_hash: TransactionHash, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_hash_to_identifier(&transaction_hash); + if self.enable_event_logging { + info!("Transaction Expired: {:18}", entity_identifier); + debug!("Transaction Expired: {}", entity_identifier); + } + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let res = self + .database + .save_transaction_expired( + TransactionExpired::new(transaction_hash), + id, + source.to_string(), + api_version, + network_name, + ) + .await; + handle_database_save_result( + "TransactionExpired", + &entity_identifier, + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_processed( + &self, + transaction_processed: TransactionProcessed, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_processed.identifier(); + if self.enable_event_logging { + info!("Transaction Processed: {:18}", entity_identifier); + debug!("Transaction Processed: {}", entity_identifier); + } + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let messages_len = transaction_processed.messages().len(); + + if messages_len > 0 { + observe_contract_messages("all", messages_len); + } + let res = self + .database + .save_transaction_processed( + transaction_processed, + id, + source.to_string(), + api_version, + network_name, + ) + .await; + if res.is_ok() && messages_len > 0 { + observe_contract_messages("unique", messages_len); + } + handle_database_save_result( + "TransactionProcessed", + &entity_identifier, + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_fault( + &self, + era_id: EraId, + timestamp: Timestamp, + public_key: PublicKey, + sse_event: SseEvent, + ) { + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let fault_identifier = format!("{}-{}", era_id.value(), public_key); + let fault = Fault::new(era_id, public_key, timestamp); + warn!(%fault, "Fault reported"); + let res = self + .database + .save_fault(fault, id, source, api_version, network_name) + .await; + + handle_database_save_result( + "Fault", + &fault_identifier, + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_step(&self, step: Step, sse_event: SseEvent) { + let era_id = step.era_id; + let step_identifier = format!("{}", era_id.value()); + if self.enable_event_logging { + info!("Step at era: {}", step_identifier); + } + + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let res = self + .database + .save_step(step, id, source, api_version, network_name) + .await; + handle_database_save_result( + "Step", + step_identifier.as_str(), + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_finality_signature( + &self, + finality_signature: FinalitySignature, + sse_event: SseEvent, + ) { + if self.enable_event_logging { + debug!( + "Finality Signature: {} for {}", + finality_signature.signature(), + finality_signature.block_hash() + ); + } + let id = sse_event.id; + let source = sse_event.source.to_string(); + let api_version = sse_event.api_version; + let network_name = sse_event.network_name; + let filter = sse_event.inbound_filter; + let res = self + .database + .save_finality_signature( + finality_signature.clone(), + id, + source, + api_version, + network_name, + ) + .await; + handle_database_save_result( + "FinalitySignature", + "", + res, + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_shutdown(&self, sse_event: SseEvent) { + warn!("Node ({}) is unavailable", sse_event.source.to_string()); + let res = self + .database + .save_shutdown( + sse_event.id, + sse_event.source.to_string(), + sse_event.api_version, + sse_event.network_name, + ) + .await; + match res { + Ok(_) | Err(DatabaseWriteError::UniqueConstraint(_)) => { + // We push to outbound on UniqueConstraint error because in sse_server we match shutdowns to outbounds based on the filter they came from to prevent duplicates. + // But that also means that we need to pass through all the Shutdown events so the sse_server can determine to which outbound filters they need to be pushed (we + // don't store in DB the information from which filter did shutdown came). + if let Err(error) = self + .outbound_sse_data_sender + .send((SseData::Shutdown, Some(sse_event.inbound_filter))) + .await + { + debug!( + "Error when sending to outbound_sse_data_sender. Error: {}", + error + ); + } + } + Err(other_err) => { + count_error("db_save_error_shutdown"); + warn!(?other_err, "Unexpected error saving Shutdown") + } + } + } +} diff --git a/event_sidecar/src/event_handling_service/no_db_event_handling_service.rs b/event_sidecar/src/event_handling_service/no_db_event_handling_service.rs new file mode 100644 index 00000000..85a7d4a4 --- /dev/null +++ b/event_sidecar/src/event_handling_service/no_db_event_handling_service.rs @@ -0,0 +1,215 @@ +use crate::{ + event_handling_service::handle_database_save_result, transaction_hash_to_identifier, Fault, + FinalitySignature, Step, TransactionAccepted, TransactionProcessed, +}; +use async_trait::async_trait; +use casper_event_listener::SseEvent; +use casper_event_types::{sse_data::SseData, Filter}; +use casper_types::{ + Block, BlockHash, EraId, ProtocolVersion, PublicKey, Timestamp, TransactionHash, +}; +use derive_new::new; +use hex_fmt::HexFmt; +use metrics::sse::observe_contract_messages; +use tokio::sync::mpsc::Sender; +use tracing::{debug, info, warn}; + +use super::EventHandlingService; + +#[derive(new, Clone)] +pub struct NoDbEventHandlingService { + outbound_sse_data_sender: Sender<(SseData, Option)>, + enable_event_logging: bool, +} + +#[async_trait] +impl EventHandlingService for NoDbEventHandlingService { + async fn handle_api_version(&self, version: ProtocolVersion, filter: Filter) { + if let Err(error) = self + .outbound_sse_data_sender + .send((SseData::ApiVersion(version), Some(filter))) + .await + { + debug!( + "Error when sending to outbound_sse_data_sender. Error: {}", + error + ); + } + if self.enable_event_logging { + info!(%version, "API Version"); + } + } + + async fn handle_block_added( + &self, + block_hash: BlockHash, + _block: Box, + sse_event: SseEvent, + ) { + if self.enable_event_logging { + let hex_block_hash = HexFmt(block_hash.inner()); + info!("Block Added: {:18}", hex_block_hash); + debug!("Block Added: {}", hex_block_hash); + } + let filter = sse_event.inbound_filter; + handle_database_save_result( + "BlockAdded", + HexFmt(block_hash.inner()).to_string().as_str(), + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_accepted( + &self, + transaction_accepted: TransactionAccepted, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_accepted.identifier(); + if self.enable_event_logging { + info!("Transaction Accepted: {:18}", entity_identifier); + debug!("Transaction Accepted: {}", entity_identifier); + } + let filter = sse_event.inbound_filter; + handle_database_save_result( + "TransactionAccepted", + &entity_identifier, + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_expired( + &self, + transaction_hash: TransactionHash, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_hash_to_identifier(&transaction_hash); + if self.enable_event_logging { + info!("Transaction Expired: {:18}", entity_identifier); + debug!("Transaction Expired: {}", entity_identifier); + } + let filter = sse_event.inbound_filter; + handle_database_save_result( + "TransactionExpired", + &entity_identifier, + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_transaction_processed( + &self, + transaction_processed: TransactionProcessed, + sse_event: SseEvent, + ) { + let entity_identifier = transaction_processed.identifier(); + if self.enable_event_logging { + info!("Transaction Processed: {:18}", entity_identifier); + debug!("Transaction Processed: {}", entity_identifier); + } + let filter = sse_event.inbound_filter; + let messages_len = transaction_processed.messages().len(); + + if messages_len > 0 { + observe_contract_messages("all", messages_len); + } + handle_database_save_result( + "TransactionProcessed", + &entity_identifier, + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_fault( + &self, + era_id: EraId, + timestamp: Timestamp, + public_key: PublicKey, + sse_event: SseEvent, + ) { + let filter = sse_event.inbound_filter; + let fault_identifier = format!("{}-{}", era_id.value(), public_key); + let fault = Fault::new(era_id, public_key, timestamp); + warn!(%fault, "Fault reported"); + + handle_database_save_result( + "Fault", + &fault_identifier, + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_step(&self, step: Step, sse_event: SseEvent) { + let era_id = step.era_id; + let step_identifier = format!("{}", era_id.value()); + if self.enable_event_logging { + info!("Step at era: {}", step_identifier); + } + let filter = sse_event.inbound_filter; + handle_database_save_result( + "Step", + step_identifier.as_str(), + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_finality_signature( + &self, + finality_signature: FinalitySignature, + sse_event: SseEvent, + ) { + if self.enable_event_logging { + debug!( + "Finality Signature: {} for {}", + finality_signature.signature(), + finality_signature.block_hash() + ); + } + let filter = sse_event.inbound_filter; + handle_database_save_result( + "FinalitySignature", + "", + Ok(()), + &self.outbound_sse_data_sender, + filter, + sse_event.data, + ) + .await; + } + + async fn handle_shutdown(&self, sse_event: SseEvent) { + warn!("Node ({}) is unavailable", sse_event.source.to_string()); + if let Err(error) = self + .outbound_sse_data_sender + .send((SseData::Shutdown, Some(sse_event.inbound_filter))) + .await + { + debug!( + "Error when sending to outbound_sse_data_sender. Error: {}", + error + ); + } + } +} diff --git a/event_sidecar/src/lib.rs b/event_sidecar/src/lib.rs index 5327d491..92623ef1 100644 --- a/event_sidecar/src/lib.rs +++ b/event_sidecar/src/lib.rs @@ -24,10 +24,7 @@ use crate::types::config::LegacySseApiTag; use crate::{ event_stream_server::{Config as SseConfig, EventStreamServer}, rest_server::run_server as start_rest_server, - types::{ - database::{DatabaseWriteError, DatabaseWriter}, - sse_events::*, - }, + types::sse_events::*, }; use anyhow::{Context, Error}; use api_version_manager::{ApiVersionManager, GuardedApiVersionManager}; @@ -35,19 +32,16 @@ use casper_event_listener::{ EventListener, EventListenerBuilder, NodeConnectionInterface, SseEvent, }; use casper_event_types::{sse_data::SseData, Filter}; -use casper_types::ProtocolVersion; -use event_handling_service::{DbSavingEventHandlingService, EventHandlingService}; +use event_handling_service::{ + DbSavingEventHandlingService, EventHandlingService, NoDbEventHandlingService, +}; use futures::future::join_all; -use hex_fmt::HexFmt; -use metrics::observe_error; -use metrics::sse::observe_contract_messages; use tokio::{ sync::mpsc::{channel as mpsc_channel, Receiver, Sender}, task::JoinHandle, time::sleep, }; -use tracing::{debug, error, info, trace, warn}; -use types::database::DatabaseReader; +use tracing::{error, info}; #[cfg(feature = "additional-metrics")] use utils::start_metrics_thread; @@ -63,7 +57,10 @@ pub use types::{ const DEFAULT_CHANNEL_SIZE: usize = 1000; -pub async fn run(config: SseEventServerConfig, database: Database) -> Result { +pub async fn run( + config: SseEventServerConfig, + maybe_database: Option, +) -> Result { validate_config(&config)?; let (event_listeners, sse_data_receivers) = build_event_listeners(&config)?; // This channel allows SseData to be sent from multiple connected nodes to the single EventStreamServer. @@ -76,7 +73,7 @@ pub async fn run(config: SseEventServerConfig, database: Database) -> Result, event_listeners: Vec, sse_data_receivers: Vec>, - database: Database, + maybe_database: Option, outbound_sse_data_sender: Sender<(SseData, Option)>, ) -> JoinHandle> { tokio::spawn(async move { @@ -150,7 +147,7 @@ fn start_sse_processors( } }); let join_handle = spawn_sse_processor( - &database, + maybe_database.clone(), sse_data_receiver, &outbound_sse_data_sender, connection_config, @@ -176,32 +173,48 @@ fn start_sse_processors( } fn spawn_sse_processor( - database: &Database, + maybe_database: Option, sse_data_receiver: Receiver, outbound_sse_data_sender: &Sender<(SseData, Option)>, connection_config: Connection, api_version_manager: &std::sync::Arc>, ) -> JoinHandle> { - match database.clone() { - Database::SqliteDatabaseWrapper(db) => { - let event_handling_service = - DbSavingEventHandlingService::new(outbound_sse_data_sender.clone(), db.clone()); + match maybe_database { + Some(Database::SqliteDatabaseWrapper(db)) => { + let event_handling_service = DbSavingEventHandlingService::new( + outbound_sse_data_sender.clone(), + db, + connection_config.enable_logging, + ); tokio::spawn(sse_processor( sse_data_receiver, event_handling_service, false, - connection_config.enable_logging, api_version_manager.clone(), )) } - Database::PostgreSqlDatabaseWrapper(db) => { - let event_handling_service = - DbSavingEventHandlingService::new(outbound_sse_data_sender.clone(), db.clone()); + Some(Database::PostgreSqlDatabaseWrapper(db)) => { + let event_handling_service = DbSavingEventHandlingService::new( + outbound_sse_data_sender.clone(), + db, + connection_config.enable_logging, + ); tokio::spawn(sse_processor( sse_data_receiver, event_handling_service, true, + api_version_manager.clone(), + )) + } + None => { + let event_handling_service = NoDbEventHandlingService::new( + outbound_sse_data_sender.clone(), connection_config.enable_logging, + ); + tokio::spawn(sse_processor( + sse_data_receiver, + event_handling_service, + true, api_version_manager.clone(), )) } @@ -294,14 +307,14 @@ async fn flatten_handle(handle: JoinHandle>) -> Result( sse_event: SseEvent, event_handling_service: EHS, - enable_event_logging: bool, api_version_manager: GuardedApiVersionManager, ) { - match sse_event.data { + match &sse_event.data { SseData::SidecarVersion(_) => { //Do nothing -> the inbound shouldn't produce this endpoint, it can be only produced by sidecar to the outbound } SseData::ApiVersion(version) => { + let version = *version; let mut manager_guard = api_version_manager.lock().await; let changed_newest_version = manager_guard.store_version(version); if changed_newest_version { @@ -310,62 +323,21 @@ async fn handle_single_event( .await; } drop(manager_guard); - if enable_event_logging { - info!(%version, "API Version"); - } } SseData::BlockAdded { block, block_hash } => { - if enable_event_logging { - let hex_block_hash = HexFmt(block_hash.inner()); - info!("Block Added: {:18}", hex_block_hash); - debug!("Block Added: {}", hex_block_hash); - } event_handling_service - .handle_block_added( - block_hash, - block, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_block_added(*block_hash, block.clone(), sse_event) .await; } SseData::TransactionAccepted(transaction) => { let transaction_accepted = TransactionAccepted::new(transaction.clone()); - let entity_identifier = transaction_accepted.identifier(); - if enable_event_logging { - info!("Transaction Accepted: {:18}", entity_identifier); - debug!("Transaction Accepted: {}", entity_identifier); - } event_handling_service - .handle_transaction_accepted( - transaction_accepted, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_transaction_accepted(transaction_accepted, sse_event) .await; } SseData::TransactionExpired { transaction_hash } => { - let transaction_expired = TransactionExpired::new(transaction_hash); - let entity_identifier = transaction_expired.identifier(); - if enable_event_logging { - info!("Transaction Expired: {:18}", entity_identifier); - debug!("Transaction Expired: {}", entity_identifier); - } event_handling_service - .handle_transaction_expired( - transaction_expired, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_transaction_expired(*transaction_hash, sse_event) .await; } SseData::TransactionProcessed { @@ -380,26 +352,14 @@ async fn handle_single_event( let transaction_processed = TransactionProcessed::new( transaction_hash.clone(), initiator_addr.clone(), - timestamp, - ttl, + *timestamp, + *ttl, block_hash.clone(), execution_result.clone(), messages.clone(), ); - let entity_identifier = transaction_processed.identifier(); - if enable_event_logging { - info!("Transaction Processed: {:18}", entity_identifier); - debug!("Transaction Processed: {}", entity_identifier); - } event_handling_service - .handle_transaction_processed( - transaction_processed, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_transaction_processed(transaction_processed, sse_event) .await; } SseData::Fault { @@ -407,57 +367,22 @@ async fn handle_single_event( timestamp, public_key, } => { - let fault = Fault::new(era_id, public_key.clone(), timestamp); - warn!(%fault, "Fault reported"); event_handling_service - .handle_fault( - fault, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_fault(*era_id, *timestamp, public_key.clone(), sse_event) .await; } SseData::FinalitySignature(fs) => { - if enable_event_logging { - debug!( - "Finality Signature: {} for {}", - fs.signature(), - fs.block_hash() - ); - } let finality_signature = FinalitySignature::new(fs.clone()); event_handling_service - .handle_finality_signature( - finality_signature, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) + .handle_finality_signature(finality_signature, sse_event) .await; } SseData::Step { era_id, execution_effects, } => { - let step = Step::new(era_id, execution_effects.clone()); - if enable_event_logging { - info!("Step at era: {}", era_id.value()); - } - event_handling_service - .handle_step( - step, - sse_event.id, - sse_event.source.to_string(), - sse_event.api_version, - sse_event.network_name, - sse_event.inbound_filter, - ) - .await; + let step = Step::new(*era_id, execution_effects.clone()); + event_handling_service.handle_step(step, sse_event).await; } SseData::Shutdown => event_handling_service.handle_shutdown(sse_event).await, } @@ -467,7 +392,6 @@ async fn sse_processor, event_handling_service: EHS, database_supports_multithreaded_processing: bool, - enable_event_logging: bool, api_version_manager: GuardedApiVersionManager, ) -> Result<(), Error> { #[cfg(feature = "additional-metrics")] @@ -477,7 +401,6 @@ async fn sse_processor, event_handling_service: EHS, api_version_manager: GuardedApiVersionManager, - enable_event_logging: bool, #[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>, ) { tokio::spawn(async move { @@ -509,7 +430,6 @@ fn handle_events_in_thread( mut inbound_sse_data_receiver: Receiver, event_handling_service: EHS, - enable_event_logging: bool, api_version_manager: GuardedApiVersionManager, #[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>, ) { @@ -541,7 +460,6 @@ async fn start_multi_threaded_events_consumer< rx, event_handling_service.clone(), api_version_manager.clone(), - enable_event_logging, #[cfg(feature = "additional-metrics")] metrics_sender.clone(), ); @@ -566,7 +484,6 @@ async fn start_single_threaded_events_consumer< >( mut inbound_sse_data_receiver: Receiver, event_handling_service: EHS, - enable_event_logging: bool, api_version_manager: GuardedApiVersionManager, #[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>, ) { @@ -574,7 +491,6 @@ async fn start_single_threaded_events_consumer< handle_single_event( sse_event, event_handling_service.clone(), - enable_event_logging, api_version_manager.clone(), ) .await; diff --git a/event_sidecar/src/tests/integration_tests.rs b/event_sidecar/src/tests/integration_tests.rs index 188a258f..6c1c3603 100644 --- a/event_sidecar/src/tests/integration_tests.rs +++ b/event_sidecar/src/tests/integration_tests.rs @@ -47,7 +47,7 @@ async fn should_not_allow_zero_max_attempts() { .expect("database should start"); let shutdown_error = run( testing_config.inner(), - Database::SqliteDatabaseWrapper(sqlite_database), + Some(Database::SqliteDatabaseWrapper(sqlite_database)), ) .await .expect_err("Sidecar should return an Err on shutdown"); diff --git a/event_sidecar/src/types/config.rs b/event_sidecar/src/types/config.rs index d6452084..e1f4036e 100644 --- a/event_sidecar/src/types/config.rs +++ b/event_sidecar/src/types/config.rs @@ -64,9 +64,10 @@ impl Default for SseEventServerConfig { #[cfg(any(feature = "testing", test))] impl SseEventServerConfig { pub fn default_no_persistence() -> Self { - let mut default = Self::default(); - default.disable_event_persistence = Some(true); - default + Self { + disable_event_persistence: Some(true), + ..Default::default() + } } } diff --git a/event_sidecar/src/types/sse_events.rs b/event_sidecar/src/types/sse_events.rs index 7460f83c..0f39b3f9 100644 --- a/event_sidecar/src/types/sse_events.rs +++ b/event_sidecar/src/types/sse_events.rs @@ -1,6 +1,5 @@ #[cfg(test)] use casper_types::ChainNameDigest; -use casper_types::FinalitySignature as FinSig; use casper_types::{ contract_messages::Messages, execution::ExecutionResult, AsymmetricType, Block, BlockHash, EraId, InitiatorAddr, ProtocolVersion, PublicKey, TimeDiff, Timestamp, Transaction, @@ -12,6 +11,7 @@ use casper_types::{ testing::TestRng, TestBlockBuilder, TestBlockV1Builder, }; +use casper_types::{FinalitySignature as FinSig, Signature}; use derive_new::new; use hex::ToHex; #[cfg(test)] @@ -224,7 +224,7 @@ impl TransactionExpired { } } pub fn transaction_hash(&self) -> TransactionHash { - self.transaction_hash.clone() + self.transaction_hash } #[cfg(test)] @@ -309,6 +309,14 @@ impl FinalitySignature { *self.0.clone() } + pub fn signature(&self) -> &Signature { + self.0.signature() + } + + pub fn block_hash(&self) -> &BlockHash { + self.0.block_hash() + } + pub fn hex_encoded_block_hash(&self) -> String { hex::encode(self.0.block_hash().inner()) } @@ -341,7 +349,7 @@ impl Step { } } -fn transaction_hash_to_identifier(transaction_hash: &TransactionHash) -> String { +pub fn transaction_hash_to_identifier(transaction_hash: &TransactionHash) -> String { match transaction_hash { TransactionHash::Deploy(deploy) => hex::encode(deploy.inner()), TransactionHash::V1(transaction) => hex::encode(transaction.inner()), diff --git a/event_sidecar/src/utils.rs b/event_sidecar/src/utils.rs index aab25e98..97ee4115 100644 --- a/event_sidecar/src/utils.rs +++ b/event_sidecar/src/utils.rs @@ -447,6 +447,6 @@ pub mod tests { run_rest_server(rest_api_server_config, database_for_rest_api).await }); } - run(sse_config, database).await + run(sse_config, Some(database)).await } } diff --git a/sidecar/src/component.rs b/sidecar/src/component.rs index 1a311094..d878be6e 100644 --- a/sidecar/src/component.rs +++ b/sidecar/src/component.rs @@ -89,16 +89,26 @@ impl Component for SseServerComponent { &self, config: &SidecarConfig, ) -> Result>>, ComponentError> { - if let (Some(storage_config), Some(database), Some(sse_server_config)) = - (&config.storage, &self.maybe_database, &config.sse_server) + if let (maybe_database, Some(sse_server_config)) = + (&self.maybe_database, &config.sse_server) { if sse_server_config.enable_server { - let database = - database.acquire().await.as_ref().map_err(|db_err| { - ComponentError::runtime_error(self.name(), db_err.into()) - })?; + let maybe_database = if let Some(lazy_database_wrapper) = maybe_database { + let database = + lazy_database_wrapper + .acquire() + .await + .clone() + .map_err(|db_err| { + ComponentError::runtime_error(self.name(), (&db_err).into()) + })?; + Some(database) + } else { + None + }; + // If sse server is configured, both storage config and database must be "Some" here. This should be ensured by prior validation. - let future = run_sse_sidecar(sse_server_config.clone(), database.clone()) + let future = run_sse_sidecar(sse_server_config.clone(), maybe_database) .map(|res| res.map_err(|e| ComponentError::runtime_error(self.name(), e))); Ok(Some(Box::pin(future))) } else { @@ -240,12 +250,12 @@ mod tests { }; #[tokio::test] - async fn given_sse_server_component_when_no_db_should_return_none() { + async fn given_sse_server_component_when_no_db_but_config_defined_should_return_some() { let component = SseServerComponent::new(None); let config = all_components_all_enabled(); let res = component.prepare_component_task(&config).await; assert!(res.is_ok()); - assert!(res.unwrap().is_none()); + assert!(res.unwrap().is_some()); } #[tokio::test]