Skip to content

Commit

Permalink
Merge pull request #2501 from eqlabs/vbar/remove-block-id
Browse files Browse the repository at this point in the history
fix: remove block_id from starknet_subscribeTransactionStatus
  • Loading branch information
vbar authored Jan 22, 2025
2 parents f3edbd7 + a977074 commit ea5bb52
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 75 deletions.
131 changes: 57 additions & 74 deletions crates/rpc/src/method/subscribe_transaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use axum::async_trait;
use pathfinder_common::receipt::ExecutionStatus;
use pathfinder_common::{BlockId, BlockNumber, TransactionHash};
use pathfinder_common::{BlockNumber, TransactionHash};
use reply::transaction_status as status;
use starknet_gateway_client::GatewayApi;
use starknet_gateway_types::reply;
Expand All @@ -12,7 +12,6 @@ use tokio::time::MissedTickBehavior;

use super::REORG_SUBSCRIPTION_NAME;
use crate::context::RpcContext;
use crate::error::ApplicationError;
use crate::jsonrpc::{RpcError, RpcSubscriptionFlow, SubscriptionMessage};
use crate::Reorg;

Expand All @@ -21,15 +20,13 @@ pub struct SubscribeTransactionStatus;
#[derive(Debug, Clone, Default)]
pub struct Params {
transaction_hash: TransactionHash,
block_id: Option<BlockId>,
}

impl crate::dto::DeserializeForVersion for Params {
fn deserialize(value: crate::dto::Value) -> Result<Self, serde_json::Error> {
value.deserialize_map(|value| {
Ok(Self {
transaction_hash: value.deserialize("transaction_hash").map(TransactionHash)?,
block_id: value.deserialize_optional_serde("block_id")?,
})
})
}
Expand Down Expand Up @@ -143,49 +140,51 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus {
let mut l2_blocks = state.notifications.l2_blocks.subscribe();
let mut reorgs = state.notifications.reorgs.subscribe();
let storage = state.storage.clone();
if let Some(first_block) = params.block_id {
// Check if we have the transaction in our database, and if so, send the
// relevant transaction status updates.
let (first_block, l1_state, tx_with_receipt) =
util::task::spawn_blocking(move |_| -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
let db = conn.transaction().map_err(RpcError::InternalError)?;
let first_block = db
.block_number(first_block.try_into().map_err(|_| {
RpcError::ApplicationError(ApplicationError::CallOnPending)
})?)
.map_err(RpcError::InternalError)?;
let l1_block_number =
db.latest_l1_state().map_err(RpcError::InternalError)?;
let tx_with_receipt = db
.transaction_with_receipt(tx_hash)
.map_err(RpcError::InternalError)?;
Ok((first_block, l1_block_number, tx_with_receipt))
})
.await
.map_err(|e| RpcError::InternalError(e.into()))??;
let first_block = first_block
.ok_or_else(|| RpcError::ApplicationError(ApplicationError::BlockNotFound))?;
if let Some((_, receipt, _, block_number)) = tx_with_receipt {
// We already have the transaction in the database.
if let Some(parent) = block_number.parent() {
// This transaction was pending in the parent block.
if first_block <= parent {
if sender
.send(parent, FinalityStatus::Received, None)
.await
.is_err()
{
// Subscription closing.
break;
}
}
// Check if we have the transaction in our database, and if so, send the
// relevant transaction status updates.
let (l1_state, tx_with_receipt) =
util::task::spawn_blocking(move |_| -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
let db = conn.transaction().map_err(RpcError::InternalError)?;
let l1_block_number = db.latest_l1_state().map_err(RpcError::InternalError)?;
let tx_with_receipt = db
.transaction_with_receipt(tx_hash)
.map_err(RpcError::InternalError)?;
Ok((l1_block_number, tx_with_receipt))
})
.await
.map_err(|e| RpcError::InternalError(e.into()))??;
if let Some((_, receipt, _, block_number)) = tx_with_receipt {
// We already have the transaction in the database.
if let Some(parent) = block_number.parent() {
// This transaction was pending in the parent block.
if sender
.send(parent, FinalityStatus::Received, None)
.await
.is_err()
{
// Subscription closing.
break;
}
if first_block <= block_number {
}
if sender
.send(
block_number,
FinalityStatus::AcceptedOnL2,
Some(receipt.execution_status.clone()),
)
.await
.is_err()
{
// Subscription closing.
break;
}
if let Some(l1_state) = l1_state {
if l1_state.block_number >= block_number {
if sender
.send(
block_number,
FinalityStatus::AcceptedOnL2,
l1_state.block_number,
FinalityStatus::AcceptedOnL1,
Some(receipt.execution_status.clone()),
)
.await
Expand All @@ -195,22 +194,6 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus {
break;
}
}
if let Some(l1_state) = l1_state {
if l1_state.block_number >= block_number {
if sender
.send(
l1_state.block_number,
FinalityStatus::AcceptedOnL1,
Some(receipt.execution_status.clone()),
)
.await
.is_err()
{
// Subscription closing.
break;
}
}
}
}
}
let pending = pending_data.borrow_and_update().clone();
Expand Down Expand Up @@ -478,7 +461,7 @@ mod tests {
"finality_status": "RECEIVED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -492,7 +475,7 @@ mod tests {
"execution_status": "SUCCEEDED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
})
]
Expand Down Expand Up @@ -579,7 +562,7 @@ mod tests {
"execution_status": "SUCCEEDED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
})
);
Expand Down Expand Up @@ -608,7 +591,7 @@ mod tests {
"finality_status": "RECEIVED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -623,7 +606,7 @@ mod tests {
"failure_reason": "tx revert"
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
]
Expand Down Expand Up @@ -653,7 +636,7 @@ mod tests {
"finality_status": "RECEIVED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -667,7 +650,7 @@ mod tests {
"execution_status": "SUCCEEDED"
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -681,7 +664,7 @@ mod tests {
"execution_status": "SUCCEEDED"
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
]
Expand Down Expand Up @@ -713,7 +696,7 @@ mod tests {
"finality_status": "RECEIVED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -728,7 +711,7 @@ mod tests {
"failure_reason": "tx revert"
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
serde_json::json!({
Expand All @@ -743,7 +726,7 @@ mod tests {
"failure_reason": "tx revert"
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
}),
]
Expand Down Expand Up @@ -943,7 +926,7 @@ mod tests {
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{"block_id": {"block_number": 0}, "transaction_hash": tx_hash}
{"transaction_hash": tx_hash}
);
receiver_tx
.send(Ok(Message::Text(
Expand Down Expand Up @@ -1097,7 +1080,7 @@ mod tests {
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{"block_id": {"block_number": 0}, "transaction_hash": tx_hash}
{"transaction_hash": tx_hash}
);
receiver_tx
.send(Ok(Message::Text(
Expand All @@ -1122,7 +1105,7 @@ mod tests {
_ => panic!("Expected text message"),
};

let subscription_id = crate::dto::Value::new(subscription_id, RpcVersion::V07);
let subscription_id = crate::dto::Value::new(subscription_id, RpcVersion::V08);
let subscription_id: SubscriptionId = subscription_id.deserialize().unwrap();
for msg in expected(subscription_id) {
let status = sender_rx.recv().await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion doc/rpc/v08/starknet_ws_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,4 @@
}
}
}
}
}

0 comments on commit ea5bb52

Please sign in to comment.