Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): reorgs in starknet_subscribeNewHeads #2249

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pathfinder_ethereum::{EthereumApi, EthereumStateUpdate};
use pathfinder_merkle_tree::contract_state::update_contract_state;
use pathfinder_merkle_tree::{ClassCommitmentTree, StorageCommitmentTree};
use pathfinder_rpc::v02::types::syncing::{self, NumberedBlock, Syncing};
use pathfinder_rpc::{Notifications, PendingData, SyncState, TopicBroadcasters};
use pathfinder_rpc::{Notifications, PendingData, Reorg, SyncState, TopicBroadcasters};
use pathfinder_storage::{Connection, Storage, Transaction, TransactionBehavior};
use primitive_types::H160;
use starknet_gateway_client::GatewayApi;
Expand Down Expand Up @@ -603,7 +603,7 @@ async fn consumer(
}
Reorg(reorg_tail) => {
tracing::trace!("Reorg L2 state to block {}", reorg_tail);
l2_reorg(&mut db_conn, reorg_tail)
l2_reorg(&mut db_conn, reorg_tail, &mut notifications)
.await
.with_context(|| format!("Reorg L2 state to {reorg_tail:?}"))?;

Expand Down Expand Up @@ -1011,7 +1011,11 @@ async fn l2_update(
Ok(())
}

async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyhow::Result<()> {
async fn l2_reorg(
connection: &mut Connection,
reorg_tail: BlockNumber,
notifications: &mut Notifications,
) -> anyhow::Result<()> {
tokio::task::block_in_place(move || {
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
Expand All @@ -1023,6 +1027,15 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho
.context("Latest block number is none during reorg")?
.0;

let reorg_tail_hash = transaction
.block_hash(reorg_tail.into())
.context("Fetching first block hash")?
.context("Expected first block hash to exist")?;
let head_hash = transaction
.block_hash(head.into())
.context("Fetching last block hash")?
.context("Expected last block hash to exist")?;

transaction
.increment_reorg_counter()
.context("Incrementing reorg counter")?;
Expand Down Expand Up @@ -1076,7 +1089,26 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho
}
}

transaction.commit().context("Commit database transaction")
transaction
.commit()
.context("Commit database transaction")?;

notifications
.reorgs
.send(
Reorg {
first_block_number: reorg_tail,
first_block_hash: reorg_tail_hash,
last_block_number: head,
last_block_hash: head_hash,
}
.into(),
)
// Ignore errors in case nobody is listening. New listeners may subscribe in the
// future.
.ok();

Ok(())
})
}

Expand Down
1 change: 0 additions & 1 deletion crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,6 @@ async fn reorg(

#[cfg(test)]
mod tests {

mod sync {
use std::sync::LazyLock;

Expand Down
21 changes: 21 additions & 0 deletions crates/rpc/src/dto/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pathfinder_common::{GasPrice, L1DataAvailabilityMode};
use serde::de::Error;

use super::serialize::SerializeStruct;
use crate::Reorg;

#[derive(Debug)]
pub struct BlockHeader<'a>(pub &'a pathfinder_common::BlockHeader);
Expand Down Expand Up @@ -135,3 +136,23 @@ impl crate::dto::serialize::SerializeForVersion for ResourcePrice {
serializer.end()
}
}

impl crate::dto::serialize::SerializeForVersion for Reorg {
fn serialize(
&self,
serializer: super::serialize::Serializer,
) -> Result<super::serialize::Ok, super::serialize::Error> {
let mut serializer = serializer.serialize_struct()?;
serializer.serialize_field("first_block_number", &self.first_block_number.get())?;
serializer.serialize_field(
"first_block_hash",
&crate::dto::Felt(&self.first_block_hash.0),
)?;
serializer.serialize_field("last_block_number", &self.last_block_number.get())?;
serializer.serialize_field(
"last_block_hash",
&crate::dto::Felt(&self.last_block_hash.0),
)?;
serializer.end()
}
}
24 changes: 22 additions & 2 deletions crates/rpc/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ pub mod websocket;
use std::sync::Arc;

pub use error::RpcError;
use pathfinder_common::{BlockHash, BlockNumber};
pub use request::RpcRequest;
pub use response::RpcResponse;
#[cfg(test)]
pub use router::handle_json_rpc_socket;
pub use router::{rpc_handler, RpcRouter, RpcRouterBuilder, RpcSubscriptionFlow};
pub use router::{
rpc_handler,
RpcRouter,
RpcRouterBuilder,
RpcSubscriptionFlow,
SubscriptionMessage,
};
use tokio::sync::broadcast;

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -33,11 +40,24 @@ impl RequestId {
#[derive(Debug, Clone)]
pub struct Notifications {
pub block_headers: broadcast::Sender<Arc<pathfinder_common::BlockHeader>>,
pub reorgs: broadcast::Sender<Arc<Reorg>>,
}

#[derive(Debug, Clone)]
pub struct Reorg {
pub first_block_number: BlockNumber,
pub first_block_hash: BlockHash,
pub last_block_number: BlockNumber,
pub last_block_hash: BlockHash,
}

impl Default for Notifications {
fn default() -> Self {
let (block_headers, _) = broadcast::channel(1024);
Self { block_headers }
let (reorgs, _) = broadcast::channel(1024);
Self {
block_headers,
reorgs,
}
}
}
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::response::IntoResponse;
use futures::{Future, FutureExt, StreamExt};
use http::HeaderValue;
use method::RpcMethodEndpoint;
pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow};
pub use subscription::{handle_json_rpc_socket, RpcSubscriptionFlow, SubscriptionMessage};
use subscription::{split_ws, RpcSubscriptionEndpoint};

use crate::context::RpcContext;
Expand Down
77 changes: 53 additions & 24 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ pub trait RpcSubscriptionFlow: Send + Sync {
type Request: crate::dto::DeserializeForVersion + Send + Sync + 'static;
type Notification: crate::dto::serialize::SerializeForVersion + Send + Sync + 'static;

/// The value for the `method` field of the subscription notification.
fn subscription_name() -> &'static str;

/// The block to start streaming from. If the subscription endpoint does not
/// support catching up, the value returned by this method is
/// irrelevant.
Expand All @@ -76,10 +73,25 @@ pub trait RpcSubscriptionFlow: Send + Sync {
req: &Self::Request,
from: BlockNumber,
to: BlockNumber,
) -> Result<Vec<(Self::Notification, BlockNumber)>, RpcError>;
) -> Result<Vec<SubscriptionMessage<Self::Notification>>, RpcError>;

/// Subscribe to active updates.
async fn subscribe(state: RpcContext, tx: mpsc::Sender<(Self::Notification, BlockNumber)>);
async fn subscribe(
state: RpcContext,
tx: mpsc::Sender<SubscriptionMessage<Self::Notification>>,
);
}

#[derive(Debug)]
pub struct SubscriptionMessage<T> {
/// [`RpcSubscriptionFlow::Notification`] to be sent to the client.
pub notification: T,
/// The block number of the notification. If the notification does not have
/// a block number, this value does not matter.
pub block_number: BlockNumber,
/// The value for the `method` field of the subscription notification sent
/// to the client.
pub subscription_name: &'static str,
}

#[axum::async_trait]
Expand All @@ -102,7 +114,6 @@ where
subscription_id,
subscriptions,
tx,
subscription_name: T::subscription_name(),
version,
_phantom: Default::default(),
};
Expand Down Expand Up @@ -143,12 +154,16 @@ where
// Caught up.
break;
}
for (message, block_number) in messages {
if tx.send(message).await.is_err() {
for msg in messages {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
current_block = block_number;
current_block = msg.block_number;
}
// Increment the current block by 1 because the catch_up range is inclusive.
current_block += 1;
Expand All @@ -158,9 +173,9 @@ where
};

// Subscribe to new blocks. Receive the first subscription message.
let (tx1, mut rx1) = mpsc::channel::<(T::Notification, BlockNumber)>(1024);
let (tx1, mut rx1) = mpsc::channel::<SubscriptionMessage<T::Notification>>(1024);
tokio::spawn(T::subscribe(state.clone(), tx1));
let (first_message, block_number) = match rx1.recv().await {
let first_msg = match rx1.recv().await {
Some(msg) => msg,
None => {
// Subscription closing.
Expand All @@ -172,34 +187,46 @@ where
// block that will be streamed from the subscription. This way we don't miss any
// blocks. Because the catch_up range is inclusive, we need to subtract 1 from
// the block number.
if let Some(block_number) = block_number.parent() {
if let Some(block_number) = first_msg.block_number.parent() {
let messages = T::catch_up(&state, &req, current_block, block_number).await?;
for (message, _) in messages {
if tx.send(message).await.is_err() {
for msg in messages {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
}
}

// Send the first subscription message and then forward the rest.
if tx.send(first_message).await.is_err() {
if tx
.send(first_msg.notification, first_msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
return Ok(());
}
let mut last_block = block_number;
let mut last_block = first_msg.block_number;
tokio::spawn(async move {
while let Some((message, block_number)) = rx1.recv().await {
if block_number.get() > last_block.get() + 1 {
while let Some(msg) = rx1.recv().await {
if msg.block_number.get() > last_block.get() + 1 {
// One or more blocks have been skipped. This is likely due to a race condition
// resulting from a reorg. This message should be ignored.
continue;
}
if tx.send(message).await.is_err() {
if tx
.send(msg.notification, msg.subscription_name)
.await
.is_err()
{
// Subscription closing.
break;
}
last_block = block_number;
last_block = msg.block_number;
}
});
Ok(())
Expand Down Expand Up @@ -455,7 +482,6 @@ struct SubscriptionSender<T> {
subscription_id: SubscriptionId,
subscriptions: Arc<DashMap<SubscriptionId, tokio::task::JoinHandle<()>>>,
tx: mpsc::Sender<Result<Message, RpcResponse>>,
subscription_name: &'static str,
version: RpcVersion,
_phantom: std::marker::PhantomData<T>,
}
Expand All @@ -466,22 +492,25 @@ impl<T> Clone for SubscriptionSender<T> {
subscription_id: self.subscription_id,
subscriptions: self.subscriptions.clone(),
tx: self.tx.clone(),
subscription_name: self.subscription_name,
version: self.version,
_phantom: Default::default(),
}
}
}

impl<T: crate::dto::serialize::SerializeForVersion> SubscriptionSender<T> {
pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<()>> {
pub async fn send(
&self,
value: T,
subscription_name: &'static str,
) -> Result<(), mpsc::error::SendError<()>> {
if !self.subscriptions.contains_key(&self.subscription_id) {
// Race condition due to the subscription ending.
return Ok(());
}
let notification = RpcNotification {
jsonrpc: "2.0",
method: self.subscription_name,
method: subscription_name,
params: SubscriptionResult {
subscription_id: self.subscription_id,
result: value,
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use axum::response::IntoResponse;
use context::RpcContext;
pub use executor::compose_executor_transaction;
use http_body::Body;
pub use jsonrpc::Notifications;
pub use jsonrpc::{Notifications, Reorg};
use pathfinder_common::AllowedOrigins;
pub use pending::PendingData;
use tokio::sync::RwLock;
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ pub use simulate_transactions::simulate_transactions;
pub use syncing::syncing;
pub use trace_block_transactions::trace_block_transactions;
pub use trace_transaction::trace_transaction;

const REORG_SUBSCRIPTION_NAME: &str = "starknet_subscriptionReorg";
Loading