Skip to content

Commit

Permalink
Merge pull request #2249 from eqlabs/sistemd/starknet-subscribe-new-h…
Browse files Browse the repository at this point in the history
…eads-reorgs

feat(rpc): reorgs in `starknet_subscribeNewHeads`
  • Loading branch information
sistemd authored Sep 20, 2024
2 parents be4f25b + 775a949 commit f4fcad9
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 84 deletions.
40 changes: 36 additions & 4 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pathfinder_ethereum::{EthereumApi, EthereumStateUpdate};
use pathfinder_merkle_tree::contract_state::update_contract_state;
use pathfinder_merkle_tree::{ClassCommitmentTree, StorageCommitmentTree};
use pathfinder_rpc::v02::types::syncing::{self, NumberedBlock, Syncing};
use pathfinder_rpc::{Notifications, PendingData, SyncState, TopicBroadcasters};
use pathfinder_rpc::{Notifications, PendingData, Reorg, SyncState, TopicBroadcasters};
use pathfinder_storage::{Connection, Storage, Transaction, TransactionBehavior};
use primitive_types::H160;
use starknet_gateway_client::GatewayApi;
Expand Down Expand Up @@ -603,7 +603,7 @@ async fn consumer(
}
Reorg(reorg_tail) => {
tracing::trace!("Reorg L2 state to block {}", reorg_tail);
l2_reorg(&mut db_conn, reorg_tail)
l2_reorg(&mut db_conn, reorg_tail, &mut notifications)
.await
.with_context(|| format!("Reorg L2 state to {reorg_tail:?}"))?;

Expand Down Expand Up @@ -1011,7 +1011,11 @@ async fn l2_update(
Ok(())
}

async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyhow::Result<()> {
async fn l2_reorg(
connection: &mut Connection,
reorg_tail: BlockNumber,
notifications: &mut Notifications,
) -> anyhow::Result<()> {
tokio::task::block_in_place(move || {
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
Expand All @@ -1023,6 +1027,15 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho
.context("Latest block number is none during reorg")?
.0;

let reorg_tail_hash = transaction
.block_hash(reorg_tail.into())
.context("Fetching first block hash")?
.context("Expected first block hash to exist")?;
let head_hash = transaction
.block_hash(head.into())
.context("Fetching last block hash")?
.context("Expected last block hash to exist")?;

transaction
.increment_reorg_counter()
.context("Incrementing reorg counter")?;
Expand Down Expand Up @@ -1076,7 +1089,26 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho
}
}

transaction.commit().context("Commit database transaction")
transaction
.commit()
.context("Commit database transaction")?;

notifications
.reorgs
.send(
Reorg {
first_block_number: reorg_tail,
first_block_hash: reorg_tail_hash,
last_block_number: head,
last_block_hash: head_hash,
}
.into(),
)
// Ignore errors in case nobody is listening. New listeners may subscribe in the
// future.
.ok();

Ok(())
})
}

Expand Down
1 change: 0 additions & 1 deletion crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,6 @@ async fn reorg(

#[cfg(test)]
mod tests {

mod sync {
use std::sync::LazyLock;

Expand Down
21 changes: 21 additions & 0 deletions crates/rpc/src/dto/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pathfinder_common::{GasPrice, L1DataAvailabilityMode};
use serde::de::Error;

use super::serialize::SerializeStruct;
use crate::Reorg;

#[derive(Debug)]
pub struct BlockHeader<'a>(pub &'a pathfinder_common::BlockHeader);
Expand Down Expand Up @@ -135,3 +136,23 @@ impl crate::dto::serialize::SerializeForVersion for ResourcePrice {
serializer.end()
}
}

impl crate::dto::serialize::SerializeForVersion for Reorg {
fn serialize(
&self,
serializer: super::serialize::Serializer,
) -> Result<super::serialize::Ok, super::serialize::Error> {
let mut serializer = serializer.serialize_struct()?;
serializer.serialize_field("first_block_number", &self.first_block_number.get())?;
serializer.serialize_field(
"first_block_hash",
&crate::dto::Felt(&self.first_block_hash.0),
)?;
serializer.serialize_field("last_block_number", &self.last_block_number.get())?;
serializer.serialize_field(
"last_block_hash",
&crate::dto::Felt(&self.last_block_hash.0),
)?;
serializer.end()
}
}
24 changes: 22 additions & 2 deletions crates/rpc/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ pub mod websocket;
use std::sync::Arc;

pub use error::RpcError;
use pathfinder_common::{BlockHash, BlockNumber};
pub use request::RpcRequest;
pub use response::RpcResponse;
#[cfg(test)]
pub use router::handle_json_rpc_socket;
pub use router::{rpc_handler, RpcRouter, RpcRouterBuilder, RpcSubscriptionFlow};
pub use router::{
rpc_handler,
RpcRouter,
RpcRouterBuilder,
RpcSubscriptionFlow,
SubscriptionMessage,
};
use tokio::sync::broadcast;

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -33,11 +40,24 @@ impl RequestId {
#[derive(Debug, Clone)]
pub struct Notifications {
pub block_headers: broadcast::Sender<Arc<pathfinder_common::BlockHeader>>,
pub reorgs: broadcast::Sender<Arc<Reorg>>,
}

#[derive(Debug, Clone)]
pub struct Reorg {
pub first_block_number: BlockNumber,
pub first_block_hash: BlockHash,
pub last_block_number: BlockNumber,
pub last_block_hash: BlockHash,
}

impl Default for Notifications {
fn default() -> Self {
let (block_headers, _) = broadcast::channel(1024);
Self { block_headers }
let (reorgs, _) = broadcast::channel(1024);
Self {
block_headers,
reorgs,
}
}
}
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::response::IntoResponse;
use futures::{Future, FutureExt, StreamExt};
use http::HeaderValue;
use method::RpcMethodEndpoint;
pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow};
pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow, SubscriptionMessage};
use subscription::{split_ws, RpcSubscriptionEndpoint};

use crate::context::RpcContext;
Expand Down
77 changes: 53 additions & 24 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ pub trait RpcSubscriptionFlow: Send + Sync {
type Request: crate::dto::DeserializeForVersion + Send + Sync + 'static;
type Notification: crate::dto::serialize::SerializeForVersion + Send + Sync + 'static;

/// The value for the `method` field of the subscription notification.
fn subscription_name() -> &'static str;

/// The block to start streaming from. If the subscription endpoint does not
/// support catching up, the value returned by this method is
/// irrelevant.
Expand All @@ -76,10 +73,25 @@ pub trait RpcSubscriptionFlow: Send + Sync {
req: &Self::Request,
from: BlockNumber,
to: BlockNumber,
) -> Result<Vec<(Self::Notification, BlockNumber)>, RpcError>;
) -> Result<Vec<SubscriptionMessage<Self::Notification>>, RpcError>;

/// Subscribe to active updates.
async fn subscribe(state: RpcContext, tx: mpsc::Sender<(Self::Notification, BlockNumber)>);
async fn subscribe(
state: RpcContext,
tx: mpsc::Sender<SubscriptionMessage<Self::Notification>>,
);
}

#[derive(Debug)]
pub struct SubscriptionMessage<T> {
/// [`RpcSubscriptionFlow::Notification`] to be sent to the client.
pub notification: T,
/// The block number of the notification. If the notification does not have
/// a block number, this value does not matter.
pub block_number: BlockNumber,
/// The value for the `method` field of the subscription notification sent
/// to the client.
pub subscription_name: &'static str,
}

#[axum::async_trait]
Expand All @@ -102,7 +114,6 @@ where
subscription_id,
subscriptions,
tx,
subscription_name: T::subscription_name(),
version,
_phantom: Default::default(),
};
Expand Down Expand Up @@ -143,12 +154,16 @@ where
// Caught up.
break;
}
for (message, block_number) in messages {
if tx.send(message).await.is_err() {
for msg in messages {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
current_block = block_number;
current_block = msg.block_number;
}
// Increment the current block by 1 because the catch_up range is inclusive.
current_block += 1;
Expand All @@ -158,9 +173,9 @@ where
};

// Subscribe to new blocks. Receive the first subscription message.
let (tx1, mut rx1) = mpsc::channel::<(T::Notification, BlockNumber)>(1024);
let (tx1, mut rx1) = mpsc::channel::<SubscriptionMessage<T::Notification>>(1024);
tokio::spawn(T::subscribe(state.clone(), tx1));
let (first_message, block_number) = match rx1.recv().await {
let first_msg = match rx1.recv().await {
Some(msg) => msg,
None => {
// Subscription closing.
Expand All @@ -172,34 +187,46 @@ where
// block that will be streamed from the subscription. This way we don't miss any
// blocks. Because the catch_up range is inclusive, we need to subtract 1 from
// the block number.
if let Some(block_number) = block_number.parent() {
if let Some(block_number) = first_msg.block_number.parent() {
let messages = T::catch_up(&state, &req, current_block, block_number).await?;
for (message, _) in messages {
if tx.send(message).await.is_err() {
for msg in messages {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
}
}

// Send the first subscription message and then forward the rest.
if tx.send(first_message).await.is_err() {
if tx
.send(first_msg.notification, first_msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
let mut last_block = block_number;
let mut last_block = first_msg.block_number;
tokio::spawn(async move {
while let Some((message, block_number)) = rx1.recv().await {
if block_number.get() > last_block.get() + 1 {
while let Some(msg) = rx1.recv().await {
if msg.block_number.get() > last_block.get() + 1 {
// One or more blocks have been skipped. This is likely due to a race condition
// resulting from a reorg. This message should be ignored.
continue;
}
if tx.send(message).await.is_err() {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
break;
}
last_block = block_number;
last_block = msg.block_number;
}
});
Ok(())
Expand Down Expand Up @@ -455,7 +482,6 @@ struct SubscriptionSender<T> {
subscription_id: SubscriptionId,
subscriptions: Arc<DashMap<SubscriptionId, tokio::task::JoinHandle<()>>>,
tx: mpsc::Sender<Result<Message, RpcResponse>>,
subscription_name: &'static str,
version: RpcVersion,
_phantom: std::marker::PhantomData<T>,
}
Expand All @@ -466,22 +492,25 @@ impl<T> Clone for SubscriptionSender<T> {
subscription_id: self.subscription_id,
subscriptions: self.subscriptions.clone(),
tx: self.tx.clone(),
subscription_name: self.subscription_name,
version: self.version,
_phantom: Default::default(),
}
}
}

impl<T: crate::dto::serialize::SerializeForVersion> SubscriptionSender<T> {
pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<()>> {
pub async fn send(
&self,
value: T,
subscription_name: &'static str,
) -> Result<(), mpsc::error::SendError<()>> {
if !self.subscriptions.contains_key(&self.subscription_id) {
// Race condition due to the subscription ending.
return Ok(());
}
let notification = RpcNotification {
jsonrpc: "2.0",
method: self.subscription_name,
method: subscription_name,
params: SubscriptionResult {
subscription_id: self.subscription_id,
result: value,
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use axum::response::IntoResponse;
use context::RpcContext;
pub use executor::compose_executor_transaction;
use http_body::Body;
pub use jsonrpc::Notifications;
pub use jsonrpc::{Notifications, Reorg};
use pathfinder_common::AllowedOrigins;
pub use pending::PendingData;
use tokio::sync::RwLock;
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ pub use simulate_transactions::simulate_transactions;
pub use syncing::syncing;
pub use trace_block_transactions::trace_block_transactions;
pub use trace_transaction::trace_transaction;

const REORG_SUBSCRIPTION_NAME: &str = "starknet_subscriptionReorg";
Loading

0 comments on commit f4fcad9

Please sign in to comment.