diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index dbebfbdf74..7070d6c5e4 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -25,7 +25,7 @@ use pathfinder_ethereum::{EthereumApi, EthereumStateUpdate}; use pathfinder_merkle_tree::contract_state::update_contract_state; use pathfinder_merkle_tree::{ClassCommitmentTree, StorageCommitmentTree}; use pathfinder_rpc::v02::types::syncing::{self, NumberedBlock, Syncing}; -use pathfinder_rpc::{Notifications, PendingData, SyncState, TopicBroadcasters}; +use pathfinder_rpc::{Notifications, PendingData, Reorg, SyncState, TopicBroadcasters}; use pathfinder_storage::{Connection, Storage, Transaction, TransactionBehavior}; use primitive_types::H160; use starknet_gateway_client::GatewayApi; @@ -603,7 +603,7 @@ async fn consumer( } Reorg(reorg_tail) => { tracing::trace!("Reorg L2 state to block {}", reorg_tail); - l2_reorg(&mut db_conn, reorg_tail) + l2_reorg(&mut db_conn, reorg_tail, &mut notifications) .await .with_context(|| format!("Reorg L2 state to {reorg_tail:?}"))?; @@ -1011,7 +1011,11 @@ async fn l2_update( Ok(()) } -async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyhow::Result<()> { +async fn l2_reorg( + connection: &mut Connection, + reorg_tail: BlockNumber, + notifications: &mut Notifications, +) -> anyhow::Result<()> { tokio::task::block_in_place(move || { let transaction = connection .transaction_with_behavior(TransactionBehavior::Immediate) @@ -1023,6 +1027,15 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho .context("Latest block number is none during reorg")? .0; + let reorg_tail_hash = transaction + .block_hash(reorg_tail.into()) + .context("Fetching first block hash")? + .context("Expected first block hash to exist")?; + let head_hash = transaction + .block_hash(head.into()) + .context("Fetching last block hash")? + .context("Expected last block hash to exist")?; + transaction .increment_reorg_counter() .context("Incrementing reorg counter")?; @@ -1076,7 +1089,26 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho } } - transaction.commit().context("Commit database transaction") + transaction + .commit() + .context("Commit database transaction")?; + + notifications + .reorgs + .send( + Reorg { + first_block_number: reorg_tail, + first_block_hash: reorg_tail_hash, + last_block_number: head, + last_block_hash: head_hash, + } + .into(), + ) + // Ignore errors in case nobody is listening. New listeners may subscribe in the + // future. + .ok(); + + Ok(()) }) } diff --git a/crates/pathfinder/src/state/sync/l2.rs b/crates/pathfinder/src/state/sync/l2.rs index 7f0a73faee..c7b1d3abdf 100644 --- a/crates/pathfinder/src/state/sync/l2.rs +++ b/crates/pathfinder/src/state/sync/l2.rs @@ -1123,7 +1123,6 @@ async fn reorg( #[cfg(test)] mod tests { - mod sync { use std::sync::LazyLock; diff --git a/crates/rpc/src/dto/block.rs b/crates/rpc/src/dto/block.rs index 635e297186..8c1db353df 100644 --- a/crates/rpc/src/dto/block.rs +++ b/crates/rpc/src/dto/block.rs @@ -2,6 +2,7 @@ use pathfinder_common::{GasPrice, L1DataAvailabilityMode}; use serde::de::Error; use super::serialize::SerializeStruct; +use crate::Reorg; #[derive(Debug)] pub struct BlockHeader<'a>(pub &'a pathfinder_common::BlockHeader); @@ -135,3 +136,23 @@ impl crate::dto::serialize::SerializeForVersion for ResourcePrice { serializer.end() } } + +impl crate::dto::serialize::SerializeForVersion for Reorg { + fn serialize( + &self, + serializer: super::serialize::Serializer, + ) -> Result { + let mut serializer = serializer.serialize_struct()?; + serializer.serialize_field("first_block_number", &self.first_block_number.get())?; + serializer.serialize_field( + "first_block_hash", + &crate::dto::Felt(&self.first_block_hash.0), + )?; + serializer.serialize_field("last_block_number", &self.last_block_number.get())?; + serializer.serialize_field( + "last_block_hash", + &crate::dto::Felt(&self.last_block_hash.0), + )?; + serializer.end() + } +} diff --git a/crates/rpc/src/jsonrpc.rs b/crates/rpc/src/jsonrpc.rs index a002166915..105b885774 100644 --- a/crates/rpc/src/jsonrpc.rs +++ b/crates/rpc/src/jsonrpc.rs @@ -7,11 +7,18 @@ pub mod websocket; use std::sync::Arc; pub use error::RpcError; +use pathfinder_common::{BlockHash, BlockNumber}; pub use request::RpcRequest; pub use response::RpcResponse; #[cfg(test)] pub use router::handle_json_rpc_socket; -pub use router::{rpc_handler, RpcRouter, RpcRouterBuilder, RpcSubscriptionFlow}; +pub use router::{ + rpc_handler, + RpcRouter, + RpcRouterBuilder, + RpcSubscriptionFlow, + SubscriptionMessage, +}; use tokio::sync::broadcast; #[derive(Debug, PartialEq, Clone)] @@ -33,11 +40,24 @@ impl RequestId { #[derive(Debug, Clone)] pub struct Notifications { pub block_headers: broadcast::Sender>, + pub reorgs: broadcast::Sender>, +} + +#[derive(Debug, Clone)] +pub struct Reorg { + pub first_block_number: BlockNumber, + pub first_block_hash: BlockHash, + pub last_block_number: BlockNumber, + pub last_block_hash: BlockHash, } impl Default for Notifications { fn default() -> Self { let (block_headers, _) = broadcast::channel(1024); - Self { block_headers } + let (reorgs, _) = broadcast::channel(1024); + Self { + block_headers, + reorgs, + } } } diff --git a/crates/rpc/src/jsonrpc/router.rs b/crates/rpc/src/jsonrpc/router.rs index 5982822300..a864fca40f 100644 --- a/crates/rpc/src/jsonrpc/router.rs +++ b/crates/rpc/src/jsonrpc/router.rs @@ -7,7 +7,7 @@ use axum::response::IntoResponse; use futures::{Future, FutureExt, StreamExt}; use http::HeaderValue; use method::RpcMethodEndpoint; -pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow}; +pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow, SubscriptionMessage}; use subscription::{split_ws, RpcSubscriptionEndpoint}; use crate::context::RpcContext; diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index 3774123610..6feee7e64c 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -60,9 +60,6 @@ pub trait RpcSubscriptionFlow: Send + Sync { type Request: crate::dto::DeserializeForVersion + Send + Sync + 'static; type Notification: crate::dto::serialize::SerializeForVersion + Send + Sync + 'static; - /// The value for the `method` field of the subscription notification. - fn subscription_name() -> &'static str; - /// The block to start streaming from. If the subscription endpoint does not /// support catching up, the value returned by this method is /// irrelevant. @@ -76,10 +73,25 @@ pub trait RpcSubscriptionFlow: Send + Sync { req: &Self::Request, from: BlockNumber, to: BlockNumber, - ) -> Result, RpcError>; + ) -> Result>, RpcError>; /// Subscribe to active updates. - async fn subscribe(state: RpcContext, tx: mpsc::Sender<(Self::Notification, BlockNumber)>); + async fn subscribe( + state: RpcContext, + tx: mpsc::Sender>, + ); +} + +#[derive(Debug)] +pub struct SubscriptionMessage { + /// [`RpcSubscriptionFlow::Notification`] to be sent to the client. + pub notification: T, + /// The block number of the notification. If the notification does not have + /// a block number, this value does not matter. + pub block_number: BlockNumber, + /// The value for the `method` field of the subscription notification sent + /// to the client. + pub subscription_name: &'static str, } #[axum::async_trait] @@ -102,7 +114,6 @@ where subscription_id, subscriptions, tx, - subscription_name: T::subscription_name(), version, _phantom: Default::default(), }; @@ -143,12 +154,16 @@ where // Caught up. break; } - for (message, block_number) in messages { - if tx.send(message).await.is_err() { + for msg in messages { + if tx + .send(msg.notification, msg.subscription_name) + .await + .is_err() + { // Subscription closing. return Ok(()); } - current_block = block_number; + current_block = msg.block_number; } // Increment the current block by 1 because the catch_up range is inclusive. current_block += 1; @@ -158,9 +173,9 @@ where }; // Subscribe to new blocks. Receive the first subscription message. - let (tx1, mut rx1) = mpsc::channel::<(T::Notification, BlockNumber)>(1024); + let (tx1, mut rx1) = mpsc::channel::>(1024); tokio::spawn(T::subscribe(state.clone(), tx1)); - let (first_message, block_number) = match rx1.recv().await { + let first_msg = match rx1.recv().await { Some(msg) => msg, None => { // Subscription closing. @@ -172,10 +187,14 @@ where // block that will be streamed from the subscription. This way we don't miss any // blocks. Because the catch_up range is inclusive, we need to subtract 1 from // the block number. - if let Some(block_number) = block_number.parent() { + if let Some(block_number) = first_msg.block_number.parent() { let messages = T::catch_up(&state, &req, current_block, block_number).await?; - for (message, _) in messages { - if tx.send(message).await.is_err() { + for msg in messages { + if tx + .send(msg.notification, msg.subscription_name) + .await + .is_err() + { // Subscription closing. return Ok(()); } @@ -183,23 +202,31 @@ where } // Send the first subscription message and then forward the rest. - if tx.send(first_message).await.is_err() { + if tx + .send(first_msg.notification, first_msg.subscription_name) + .await + .is_err() + { // Subscription closing. return Ok(()); } - let mut last_block = block_number; + let mut last_block = first_msg.block_number; tokio::spawn(async move { - while let Some((message, block_number)) = rx1.recv().await { - if block_number.get() > last_block.get() + 1 { + while let Some(msg) = rx1.recv().await { + if msg.block_number.get() > last_block.get() + 1 { // One or more blocks have been skipped. This is likely due to a race condition // resulting from a reorg. This message should be ignored. continue; } - if tx.send(message).await.is_err() { + if tx + .send(msg.notification, msg.subscription_name) + .await + .is_err() + { // Subscription closing. break; } - last_block = block_number; + last_block = msg.block_number; } }); Ok(()) @@ -455,7 +482,6 @@ struct SubscriptionSender { subscription_id: SubscriptionId, subscriptions: Arc>>, tx: mpsc::Sender>, - subscription_name: &'static str, version: RpcVersion, _phantom: std::marker::PhantomData, } @@ -466,7 +492,6 @@ impl Clone for SubscriptionSender { subscription_id: self.subscription_id, subscriptions: self.subscriptions.clone(), tx: self.tx.clone(), - subscription_name: self.subscription_name, version: self.version, _phantom: Default::default(), } @@ -474,14 +499,18 @@ impl Clone for SubscriptionSender { } impl SubscriptionSender { - pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<()>> { + pub async fn send( + &self, + value: T, + subscription_name: &'static str, + ) -> Result<(), mpsc::error::SendError<()>> { if !self.subscriptions.contains_key(&self.subscription_id) { // Race condition due to the subscription ending. return Ok(()); } let notification = RpcNotification { jsonrpc: "2.0", - method: self.subscription_name, + method: subscription_name, params: SubscriptionResult { subscription_id: self.subscription_id, result: value, diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index db0824cda6..f8aceee372 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -28,7 +28,7 @@ use axum::response::IntoResponse; use context::RpcContext; pub use executor::compose_executor_transaction; use http_body::Body; -pub use jsonrpc::Notifications; +pub use jsonrpc::{Notifications, Reorg}; use pathfinder_common::AllowedOrigins; pub use pending::PendingData; use tokio::sync::RwLock; diff --git a/crates/rpc/src/method.rs b/crates/rpc/src/method.rs index 039f8d73cd..225fc5764a 100644 --- a/crates/rpc/src/method.rs +++ b/crates/rpc/src/method.rs @@ -56,3 +56,5 @@ pub use simulate_transactions::simulate_transactions; pub use syncing::syncing; pub use trace_block_transactions::trace_block_transactions; pub use trace_transaction::trace_transaction; + +const REORG_SUBSCRIPTION_NAME: &str = "starknet_subscriptionReorg"; diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index 0a1009361b..b1c36cee6d 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -4,8 +4,10 @@ use axum::async_trait; use pathfinder_common::{BlockId, BlockNumber}; use tokio::sync::mpsc; +use super::REORG_SUBSCRIPTION_NAME; use crate::context::RpcContext; -use crate::jsonrpc::{RpcError, RpcSubscriptionFlow}; +use crate::jsonrpc::{RpcError, RpcSubscriptionFlow, SubscriptionMessage}; +use crate::Reorg; pub struct SubscribeNewHeads; @@ -25,26 +27,30 @@ impl crate::dto::DeserializeForVersion for Request { } #[derive(Debug)] -pub struct Message(Arc); +pub enum Message { + BlockHeader(Arc), + Reorg(Arc), +} impl crate::dto::serialize::SerializeForVersion for Message { fn serialize( &self, serializer: crate::dto::serialize::Serializer, ) -> Result { - crate::dto::BlockHeader(&self.0).serialize(serializer) + match self { + Self::BlockHeader(header) => crate::dto::BlockHeader(header).serialize(serializer), + Self::Reorg(reorg) => reorg.serialize(serializer), + } } } +const SUBSCRIPTION_NAME: &str = "starknet_subscriptionNewHeads"; + #[async_trait] impl RpcSubscriptionFlow for SubscribeNewHeads { type Request = Request; type Notification = Message; - fn subscription_name() -> &'static str { - "starknet_subscriptionNewHeads" - } - fn starting_block(req: &Self::Request) -> BlockId { req.block } @@ -54,7 +60,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { _req: &Self::Request, from: BlockNumber, to: BlockNumber, - ) -> Result, RpcError> { + ) -> Result>, RpcError> { let storage = state.storage.clone(); let headers = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; @@ -67,28 +73,66 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { .into_iter() .map(|header| { let block_number = header.number; - (Message(header.into()), block_number) + SubscriptionMessage { + notification: Message::BlockHeader(header.into()), + block_number, + subscription_name: SUBSCRIPTION_NAME, + } }) .collect()) } - async fn subscribe(state: RpcContext, tx: mpsc::Sender<(Self::Notification, BlockNumber)>) { - let mut rx = state.notifications.block_headers.subscribe(); + async fn subscribe( + state: RpcContext, + tx: mpsc::Sender>, + ) { + let mut headers = state.notifications.block_headers.subscribe(); + let mut reorgs = state.notifications.reorgs.subscribe(); loop { - match rx.recv().await { - Ok(header) => { - let block_number = header.number; - if tx.send((Message(header), block_number)).await.is_err() { - break; + tokio::select! { + reorg = reorgs.recv() => { + match reorg { + Ok(reorg) => { + let block_number = reorg.first_block_number; + if tx.send(SubscriptionMessage { + notification: Message::Reorg(reorg), + block_number, + subscription_name: REORG_SUBSCRIPTION_NAME, + }).await.is_err() { + break; + } + } + Err(e) => { + tracing::debug!( + "Error receiving reorg from notifications channel, node might be \ + lagging: {:?}", + e + ); + break; + } } } - Err(e) => { - tracing::debug!( - "Error receiving block header from notifications channel, node might be \ - lagging: {:?}", - e - ); - break; + header = headers.recv() => { + match header { + Ok(header) => { + let block_number = header.number; + if tx.send(SubscriptionMessage { + notification: Message::BlockHeader(header), + block_number, + subscription_name: SUBSCRIPTION_NAME, + }).await.is_err() { + break; + } + } + Err(e) => { + tracing::debug!( + "Error receiving block header from notifications channel, node might be \ + lagging: {:?}", + e + ); + break; + } + } } } } @@ -100,7 +144,7 @@ mod tests { use std::time::Duration; use axum::extract::ws::Message; - use pathfinder_common::{BlockHash, BlockHeader, BlockNumber, ChainId}; + use pathfinder_common::{felt, BlockHash, BlockHeader, BlockNumber, ChainId}; use pathfinder_crypto::Felt; use pathfinder_storage::StorageBuilder; use starknet_gateway_client::Client; @@ -110,7 +154,7 @@ mod tests { use crate::jsonrpc::{handle_json_rpc_socket, RpcResponse, RpcRouter}; use crate::pending::PendingWatcher; use crate::v02::types::syncing::Syncing; - use crate::{v08, Notifications, SubscriptionId, SyncState}; + use crate::{v08, Notifications, Reorg, SubscriptionId, SyncState}; #[tokio::test] async fn happy_path_with_historic_blocks() { @@ -132,10 +176,50 @@ mod tests { happy_path_test(0).await; } + #[tokio::test] + async fn reorg() { + let (_, mut rx, subscription_id, router) = happy_path_test(0).await; + router + .context + .notifications + .reorgs + .send( + Reorg { + first_block_number: BlockNumber::new_or_panic(1), + first_block_hash: BlockHash(felt!("0x1")), + last_block_number: BlockNumber::new_or_panic(2), + last_block_hash: BlockHash(felt!("0x2")), + } + .into(), + ) + .unwrap(); + let res = rx.recv().await.unwrap().unwrap(); + let json: serde_json::Value = match res { + Message::Text(json) => serde_json::from_str(&json).unwrap(), + _ => panic!("Expected text message"), + }; + assert_eq!( + json, + serde_json::json!({ + "jsonrpc": "2.0", + "method": "starknet_subscriptionReorg", + "params": { + "result": { + "first_block_hash": "0x1", + "first_block_number": 1, + "last_block_hash": "0x2", + "last_block_number": 2 + }, + "subscription_id": subscription_id.0 + } + }) + ); + } + #[tokio::test] async fn race_condition_with_historic_blocks() { let num_blocks = 1000; - let router = setup(num_blocks); + let router = setup(num_blocks).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); @@ -176,20 +260,28 @@ mod tests { } // Insert more blocks before the active updates kick in. This simulates a // real-world race condition. - for i in 0..num_blocks { - let mut conn = router.context.storage.connection().unwrap(); - let db = conn.transaction().unwrap(); - let header = sample_header(i + num_blocks); - db.insert_block_header(&header).unwrap(); - db.commit().unwrap(); - } + let storage = router.context.storage.clone(); + tokio::task::spawn_blocking(move || { + for i in 0..num_blocks { + let mut conn = storage.connection().unwrap(); + let db = conn.transaction().unwrap(); + let header = sample_header(i + num_blocks); + db.insert_block_header(&header).unwrap(); + db.commit().unwrap(); + } + }) + .await + .unwrap(); for i in 0..10 { - router - .context - .notifications - .block_headers - .send(sample_header(i + 2 * num_blocks).into()) - .unwrap(); + retry(|| { + router + .context + .notifications + .block_headers + .send(sample_header(i + 2 * num_blocks).into()) + }) + .await + .unwrap(); if i == 0 { // First, expect all the newly inserted blocks. for j in 0..num_blocks { @@ -252,15 +344,22 @@ mod tests { assert!(rx.is_empty()); } - fn setup(num_blocks: u64) -> RpcRouter { + async fn setup(num_blocks: u64) -> RpcRouter { let storage = StorageBuilder::in_memory().unwrap(); - let mut conn = storage.connection().unwrap(); - let db = conn.transaction().unwrap(); - for i in 0..num_blocks { - let header = sample_header(i); - db.insert_block_header(&header).unwrap(); - } - db.commit().unwrap(); + tokio::task::spawn_blocking({ + let storage = storage.clone(); + move || { + let mut conn = storage.connection().unwrap(); + let db = conn.transaction().unwrap(); + for i in 0..num_blocks { + let header = sample_header(i); + db.insert_block_header(&header).unwrap(); + } + db.commit().unwrap(); + } + }) + .await + .unwrap(); let (_, pending_data) = tokio::sync::watch::channel(Default::default()); let notifications = Notifications::default(); let ctx = RpcContext { @@ -294,7 +393,7 @@ mod tests { SubscriptionId, RpcRouter, ) { - let router = setup(num_blocks); + let router = setup(num_blocks).await; let (sender_tx, mut sender_rx) = mpsc::channel(1024); let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); @@ -402,14 +501,14 @@ mod tests { where E: std::fmt::Debug, { - for i in 0..10 { + for i in 0..25 { match cb() { Ok(result) => return Ok(result), Err(e) => { - if i == 9 { + if i == 24 { return Err(e); } - tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_secs(i)).await; } } }