diff --git a/crates/rpc/src/method/subscribe_transaction_status.rs b/crates/rpc/src/method/subscribe_transaction_status.rs index 48b5f08a53..2ed76b0c7b 100644 --- a/crates/rpc/src/method/subscribe_transaction_status.rs +++ b/crates/rpc/src/method/subscribe_transaction_status.rs @@ -3,7 +3,7 @@ use std::time::Duration; use axum::async_trait; use pathfinder_common::receipt::ExecutionStatus; -use pathfinder_common::{BlockId, BlockNumber, TransactionHash}; +use pathfinder_common::{BlockNumber, TransactionHash}; use reply::transaction_status as status; use starknet_gateway_client::GatewayApi; use starknet_gateway_types::reply; @@ -12,7 +12,6 @@ use tokio::time::MissedTickBehavior; use super::REORG_SUBSCRIPTION_NAME; use crate::context::RpcContext; -use crate::error::ApplicationError; use crate::jsonrpc::{RpcError, RpcSubscriptionFlow, SubscriptionMessage}; use crate::Reorg; @@ -21,7 +20,6 @@ pub struct SubscribeTransactionStatus; #[derive(Debug, Clone, Default)] pub struct Params { transaction_hash: TransactionHash, - block_id: Option, } impl crate::dto::DeserializeForVersion for Params { @@ -29,7 +27,6 @@ impl crate::dto::DeserializeForVersion for Params { value.deserialize_map(|value| { Ok(Self { transaction_hash: value.deserialize("transaction_hash").map(TransactionHash)?, - block_id: value.deserialize_optional_serde("block_id")?, }) }) } @@ -143,49 +140,51 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus { let mut l2_blocks = state.notifications.l2_blocks.subscribe(); let mut reorgs = state.notifications.reorgs.subscribe(); let storage = state.storage.clone(); - if let Some(first_block) = params.block_id { - // Check if we have the transaction in our database, and if so, send the - // relevant transaction status updates. - let (first_block, l1_state, tx_with_receipt) = - util::task::spawn_blocking(move |_| -> Result<_, RpcError> { - let mut conn = storage.connection().map_err(RpcError::InternalError)?; - let db = conn.transaction().map_err(RpcError::InternalError)?; - let first_block = db - .block_number(first_block.try_into().map_err(|_| { - RpcError::ApplicationError(ApplicationError::CallOnPending) - })?) - .map_err(RpcError::InternalError)?; - let l1_block_number = - db.latest_l1_state().map_err(RpcError::InternalError)?; - let tx_with_receipt = db - .transaction_with_receipt(tx_hash) - .map_err(RpcError::InternalError)?; - Ok((first_block, l1_block_number, tx_with_receipt)) - }) - .await - .map_err(|e| RpcError::InternalError(e.into()))??; - let first_block = first_block - .ok_or_else(|| RpcError::ApplicationError(ApplicationError::BlockNotFound))?; - if let Some((_, receipt, _, block_number)) = tx_with_receipt { - // We already have the transaction in the database. - if let Some(parent) = block_number.parent() { - // This transaction was pending in the parent block. - if first_block <= parent { - if sender - .send(parent, FinalityStatus::Received, None) - .await - .is_err() - { - // Subscription closing. - break; - } - } + // Check if we have the transaction in our database, and if so, send the + // relevant transaction status updates. + let (l1_state, tx_with_receipt) = + util::task::spawn_blocking(move |_| -> Result<_, RpcError> { + let mut conn = storage.connection().map_err(RpcError::InternalError)?; + let db = conn.transaction().map_err(RpcError::InternalError)?; + let l1_block_number = db.latest_l1_state().map_err(RpcError::InternalError)?; + let tx_with_receipt = db + .transaction_with_receipt(tx_hash) + .map_err(RpcError::InternalError)?; + Ok((l1_block_number, tx_with_receipt)) + }) + .await + .map_err(|e| RpcError::InternalError(e.into()))??; + if let Some((_, receipt, _, block_number)) = tx_with_receipt { + // We already have the transaction in the database. + if let Some(parent) = block_number.parent() { + // This transaction was pending in the parent block. + if sender + .send(parent, FinalityStatus::Received, None) + .await + .is_err() + { + // Subscription closing. + break; } - if first_block <= block_number { + } + if sender + .send( + block_number, + FinalityStatus::AcceptedOnL2, + Some(receipt.execution_status.clone()), + ) + .await + .is_err() + { + // Subscription closing. + break; + } + if let Some(l1_state) = l1_state { + if l1_state.block_number >= block_number { if sender .send( - block_number, - FinalityStatus::AcceptedOnL2, + l1_state.block_number, + FinalityStatus::AcceptedOnL1, Some(receipt.execution_status.clone()), ) .await @@ -195,22 +194,6 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus { break; } } - if let Some(l1_state) = l1_state { - if l1_state.block_number >= block_number { - if sender - .send( - l1_state.block_number, - FinalityStatus::AcceptedOnL1, - Some(receipt.execution_status.clone()), - ) - .await - .is_err() - { - // Subscription closing. - break; - } - } - } } } let pending = pending_data.borrow_and_update().clone(); @@ -478,7 +461,7 @@ mod tests { "finality_status": "RECEIVED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -492,7 +475,7 @@ mod tests { "execution_status": "SUCCEEDED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }) ] @@ -579,7 +562,7 @@ mod tests { "execution_status": "SUCCEEDED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }) ); @@ -608,7 +591,7 @@ mod tests { "finality_status": "RECEIVED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -623,7 +606,7 @@ mod tests { "failure_reason": "tx revert" } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), ] @@ -653,7 +636,7 @@ mod tests { "finality_status": "RECEIVED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -667,7 +650,7 @@ mod tests { "execution_status": "SUCCEEDED" } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -681,7 +664,7 @@ mod tests { "execution_status": "SUCCEEDED" } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), ] @@ -713,7 +696,7 @@ mod tests { "finality_status": "RECEIVED", } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -728,7 +711,7 @@ mod tests { "failure_reason": "tx revert" } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), serde_json::json!({ @@ -743,7 +726,7 @@ mod tests { "failure_reason": "tx revert" } }, - "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(), + "subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(), } }), ] @@ -943,7 +926,7 @@ mod tests { let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( - {"block_id": {"block_number": 0}, "transaction_hash": tx_hash} + {"transaction_hash": tx_hash} ); receiver_tx .send(Ok(Message::Text( @@ -1097,7 +1080,7 @@ mod tests { let (receiver_tx, receiver_rx) = mpsc::channel(1024); handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); let params = serde_json::json!( - {"block_id": {"block_number": 0}, "transaction_hash": tx_hash} + {"transaction_hash": tx_hash} ); receiver_tx .send(Ok(Message::Text( @@ -1122,7 +1105,7 @@ mod tests { _ => panic!("Expected text message"), }; - let subscription_id = crate::dto::Value::new(subscription_id, RpcVersion::V07); + let subscription_id = crate::dto::Value::new(subscription_id, RpcVersion::V08); let subscription_id: SubscriptionId = subscription_id.deserialize().unwrap(); for msg in expected(subscription_id) { let status = sender_rx.recv().await.unwrap().unwrap(); diff --git a/doc/rpc/v08/starknet_ws_api.json b/doc/rpc/v08/starknet_ws_api.json index 609128b54a..a194e19da0 100644 --- a/doc/rpc/v08/starknet_ws_api.json +++ b/doc/rpc/v08/starknet_ws_api.json @@ -360,4 +360,4 @@ } } } -} \ No newline at end of file +}