Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mempool): nonce ordering #450

Merged
merged 18 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .db-versions.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
current_version: 0
current_version: 1
versions:
- version: 1
pr: 450
- version: 0
pr: 372
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ madara.log
starknet-e2e-test/contracts/cache
starknet-e2e-test/contracts/build

# proptest report
**/proptest-regressions/

# vscode settings
.vscode/settings.json

Expand Down
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ bincode = "1.3"
fdlimit = "0.3.0"
proptest = "1.5.0"
proptest-derive = "0.5.0"
proptest-state-machine = "0.3.1"
dotenv = "0.15.0"
httpmock = "0.7.0"
tempfile = "3.10.1"
Expand Down
12 changes: 9 additions & 3 deletions crates/madara/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use mp_class::compile::ClassCompilationError;
use mp_class::ConvertedClass;
use mp_convert::ToFelt;
use mp_receipt::from_blockifier_execution_info;
use mp_state_update::{ContractStorageDiffItem, StateDiff, StorageEntry};
use mp_state_update::{ContractStorageDiffItem, NonceUpdate, StateDiff, StorageEntry};
use mp_transactions::TransactionWithHash;
use mp_utils::service::ServiceContext;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -207,7 +207,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
let to_take = batch_size.saturating_sub(txs_to_process.len());
let cur_len = txs_to_process.len();
if to_take > 0 {
self.mempool.take_txs_chunk(/* extend */ &mut txs_to_process, batch_size);
self.mempool.txs_take_chunk(/* extend */ &mut txs_to_process, batch_size);

txs_to_process_blockifier.extend(txs_to_process.iter().skip(cur_len).map(|tx| tx.clone_tx()));
}
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
// Add back the unexecuted transactions to the mempool.
stats.n_re_added_to_mempool = txs_to_process.len();
self.mempool
.re_add_txs(txs_to_process, executed_txs)
.txs_re_add(txs_to_process, executed_txs)
.map_err(|err| Error::Unexpected(format!("Mempool error: {err:#}").into()))?;

tracing::debug!(
Expand Down Expand Up @@ -331,6 +331,12 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
)
.await?;

// Removes nonces in the mempool nonce cache which have been included
// into the current block.
for NonceUpdate { contract_address, .. } in state_diff.nonces.iter() {
self.mempool.tx_mark_included(contract_address);
}

// Flush changes to disk
self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use mc_db::{MadaraBackend, MadaraStorageError};
use mc_mempool::{MempoolProvider, MempoolTransaction};
use mp_block::{header::BlockTimestamp, BlockId, BlockTag, MadaraMaybePendingBlock};
use mp_transactions::{ToBlockifierError, TransactionWithHash};
use starknet_api::StarknetApiError;
use starknet_core::types::Felt;

#[derive(Debug, thiserror::Error)]
Expand All @@ -18,6 +19,9 @@ pub enum ReAddTxsToMempoolError {
#[error("Converting transaction with hash {tx_hash:#x}: Blockifier conversion error: {err:#}")]
ToBlockifierError { tx_hash: Felt, err: ToBlockifierError },

#[error("Error converting to a MempoolTransaction: {0:#}")]
ConvertToMempoolError(#[from] StarknetApiError),

/// This error should never happen unless we are running on a platform where SystemTime cannot represent the timestamp we are making.
#[error("Converting transaction with hash {tx_hash:#x}: Could not create arrived_at timestamp with block_timestamp={block_timestamp} and tx_index={tx_index}")]
MakingArrivedAtTimestamp { tx_hash: Felt, block_timestamp: BlockTimestamp, tx_index: usize },
Expand Down Expand Up @@ -68,14 +72,19 @@ pub fn re_add_txs_to_mempool(
let arrived_at = make_arrived_at(block_timestamp, tx_index).ok_or_else(|| {
ReAddTxsToMempoolError::MakingArrivedAtTimestamp { tx_hash, block_timestamp, tx_index }
})?;
Ok(MempoolTransaction { tx, arrived_at, converted_class })

Ok::<_, ReAddTxsToMempoolError>(MempoolTransaction::new_from_blockifier_tx(
tx,
arrived_at,
converted_class,
)?)
})
.collect::<Result<_, _>>()?;

let n = txs_to_reexec.len();

mempool
.insert_txs_no_validation(txs_to_reexec, /* force insertion */ true)
.txs_insert_no_validation(txs_to_reexec, /* force insertion */ true)
.expect("Mempool force insertion should never fail");

Ok(n)
Expand Down
4 changes: 2 additions & 2 deletions crates/madara/client/db/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ fn parse_version(content: &str) -> Result<u32, BuildError> {
content
.lines()
.find(|line| line.starts_with("current_version:"))
.ok_or_else(|| BuildError::Parse(Cow::Borrowed("Could not find current_version")))?
.ok_or(BuildError::Parse(Cow::Borrowed("Could not find current_version")))?
.split(':')
.nth(1)
.ok_or_else(|| BuildError::Parse(Cow::Borrowed("Invalid current_version format")))?
.ok_or(BuildError::Parse(Cow::Borrowed("Invalid current_version format")))?
.trim()
.parse()
.map_err(|_| BuildError::Parse(Cow::Borrowed("Could not parse current_version as u32")))
Expand Down
11 changes: 10 additions & 1 deletion crates/madara/client/db/src/l1_db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rocksdb::WriteOptions;
use rocksdb::{IteratorMode, WriteOptions};
use serde::{Deserialize, Serialize};
use starknet_api::core::Nonce;

Expand Down Expand Up @@ -128,4 +128,13 @@ impl MadaraBackend {
self.db.put_cf_opt(&nonce_column, bincode::serialize(&nonce)?, /* empty value */ [], &writeopts)?;
Ok(())
}

/// Retrieve the latest L1 messaging [Nonce] if one is available, otherwise
/// returns [None].
pub fn get_l1_messaging_nonce_latest(&self) -> Result<Option<Nonce>, MadaraStorageError> {
let nonce_column = self.db.get_column(Column::L1MessagingNonce);
let mut iter = self.db.iterator_cf(&nonce_column, IteratorMode::End);
let nonce = iter.next().transpose()?.map(|(bytes, _)| bincode::deserialize(&bytes)).transpose()?;
Ok(nonce)
}
}
81 changes: 67 additions & 14 deletions crates/madara/client/db/src/mempool_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,57 @@ use crate::{Column, MadaraBackend, MadaraStorageError};
use mp_class::ConvertedClass;
use rocksdb::IteratorMode;
use serde::{Deserialize, Serialize};
use starknet_api::core::Nonce;
use starknet_types_core::felt::Felt;

type Result<T, E = MadaraStorageError> = std::result::Result<T, E>;

/// A nonce is deemed ready when it directly follows the previous nonce in db
/// for a contract address. This guarantees that dependent transactions are not
Trantorian1 marked this conversation as resolved.
Show resolved Hide resolved
/// executed out of order by the mempool.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum NonceStatus {
#[default]
Ready,
Pending,
}

/// Information used to assess the [readiness] of a transaction.
///
/// A transaction is deemed ready when its nonce directly follows the previous
/// nonce store in db for that contract address.
///
/// [nonce] and [nonce_next] are precomputed to avoid operating on a [Felt]
/// inside the hot path in the mempool.
///
/// [readiness]: NonceStatus
/// [nonce]: Self::nonce
/// [nonce_next]: Self::nonce_next
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct NonceInfo {
pub readiness: NonceStatus,
pub nonce: Nonce,
pub nonce_next: Nonce,
}

impl NonceInfo {
pub fn ready(nonce: Nonce, nonce_next: Nonce) -> Self {
debug_assert_eq!(nonce_next, nonce.try_increment().unwrap());
Self { readiness: NonceStatus::Ready, nonce, nonce_next }
}

pub fn pending(nonce: Nonce, nonce_next: Nonce) -> Self {
debug_assert_eq!(nonce_next, nonce.try_increment().unwrap());
Self { readiness: NonceStatus::Pending, nonce, nonce_next }
}
}

impl Default for NonceInfo {
fn default() -> Self {
Self::ready(Nonce(Felt::ZERO), Nonce(Felt::ONE))
}
}

#[derive(Serialize, Deserialize)]
pub struct SavedTransaction {
pub tx: mp_transactions::Transaction,
Expand All @@ -17,28 +64,33 @@ pub struct SavedTransaction {
}

#[derive(Serialize)]
struct TransactionWithConvertedClassRef<'a> {
tx: &'a SavedTransaction,
/// This struct is used as a template to serialize Mempool transactions from the
/// database without any further allocation.
struct DbMempoolTxInfoEncoder<'a> {
saved_tx: &'a SavedTransaction,
converted_class: &'a Option<ConvertedClass>,
nonce_info: &'a NonceInfo,
}
#[derive(Serialize, Deserialize)]
struct TransactionWithConvertedClass {
tx: SavedTransaction,
converted_class: Option<ConvertedClass>,

#[derive(Deserialize)]
/// This struct is used as a templace to deserialize Mempool transactions from
Trantorian1 marked this conversation as resolved.
Show resolved Hide resolved
/// the database.
pub struct DbMempoolTxInfoDecoder {
pub saved_tx: SavedTransaction,
pub converted_class: Option<ConvertedClass>,
pub nonce_readiness: NonceInfo,
}

impl MadaraBackend {
#[tracing::instrument(skip(self), fields(module = "MempoolDB"))]
pub fn get_mempool_transactions(
&self,
) -> impl Iterator<Item = Result<(Felt, SavedTransaction, Option<ConvertedClass>)>> + '_ {
pub fn get_mempool_transactions(&self) -> impl Iterator<Item = Result<(Felt, DbMempoolTxInfoDecoder)>> + '_ {
let col = self.db.get_column(Column::MempoolTransactions);
self.db.iterator_cf(&col, IteratorMode::Start).map(|kv| {
let (k, v) = kv?;
let hash: Felt = bincode::deserialize(&k)?;
let tx: TransactionWithConvertedClass = bincode::deserialize(&v)?;
let tx_info: DbMempoolTxInfoDecoder = bincode::deserialize(&v)?;

Result::<_>::Ok((hash, tx.tx, tx.converted_class))
Result::<_>::Ok((hash, tx_info))
})
}

Expand All @@ -54,18 +106,19 @@ impl MadaraBackend {
Ok(())
}

#[tracing::instrument(skip(self, tx), fields(module = "MempoolDB"))]
#[tracing::instrument(skip(self, saved_tx), fields(module = "MempoolDB"))]
pub fn save_mempool_transaction(
&self,
tx: &SavedTransaction,
saved_tx: &SavedTransaction,
tx_hash: Felt,
converted_class: &Option<ConvertedClass>,
nonce_info: &NonceInfo,
) -> Result<()> {
// Note: WAL is used here
// This is because we want it to be saved even if the node crashes before the next flush

let col = self.db.get_column(Column::MempoolTransactions);
let tx_with_class = TransactionWithConvertedClassRef { tx, converted_class };
let tx_with_class = DbMempoolTxInfoEncoder { saved_tx, converted_class, nonce_info };
self.db.put_cf(&col, bincode::serialize(&tx_hash)?, bincode::serialize(&tx_with_class)?)?;
tracing::debug!("save_mempool_tx {:?}", tx_hash);
Ok(())
Expand Down
14 changes: 7 additions & 7 deletions crates/madara/client/devnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ mod tests {
&self,
mut tx: BroadcastedInvokeTxn<Felt>,
contract: &DevnetPredeployedContract,
) -> Result<AddInvokeTransactionResult<Felt>, mc_mempool::Error> {
) -> Result<AddInvokeTransactionResult<Felt>, mc_mempool::MempoolError> {
let (blockifier_tx, _classes) = BroadcastedTxn::Invoke(tx.clone())
.into_blockifier(
self.backend.chain_config().chain_id.to_felt(),
Expand All @@ -238,14 +238,14 @@ mod tests {

tracing::debug!("tx: {:?}", tx);

self.mempool.accept_invoke_tx(tx)
self.mempool.tx_accept_invoke(tx)
}

pub fn sign_and_add_declare_tx(
&self,
mut tx: BroadcastedDeclareTxn<Felt>,
contract: &DevnetPredeployedContract,
) -> Result<ClassAndTxnHash<Felt>, mc_mempool::Error> {
) -> Result<ClassAndTxnHash<Felt>, mc_mempool::MempoolError> {
let (blockifier_tx, _classes) = BroadcastedTxn::Declare(tx.clone())
.into_blockifier(
self.backend.chain_config().chain_id.to_felt(),
Expand All @@ -262,14 +262,14 @@ mod tests {
};
*tx_signature = vec![signature.r, signature.s];

self.mempool.accept_declare_tx(tx)
self.mempool.tx_accept_declare(tx)
}

pub fn sign_and_add_deploy_account_tx(
&self,
mut tx: BroadcastedDeployAccountTxn<Felt>,
contract: &DevnetPredeployedContract,
) -> Result<ContractAndTxnHash<Felt>, mc_mempool::Error> {
) -> Result<ContractAndTxnHash<Felt>, mc_mempool::MempoolError> {
let (blockifier_tx, _classes) = BroadcastedTxn::DeployAccount(tx.clone())
.into_blockifier(
self.backend.chain_config().chain_id.to_felt(),
Expand All @@ -285,7 +285,7 @@ mod tests {
};
*tx_signature = vec![signature.r, signature.s];

self.mempool.accept_deploy_account_tx(tx)
self.mempool.tx_accept_deploy_account(tx)
}

/// (STRK in FRI, ETH in WEI)
Expand Down Expand Up @@ -716,7 +716,7 @@ mod tests {

assert_matches!(
result,
Err(mc_mempool::Error::InnerMempool(mc_mempool::TxInsersionError::Limit(
Err(mc_mempool::MempoolError::InnerMempool(mc_mempool::TxInsertionError::Limit(
mc_mempool::MempoolLimitReached::MaxTransactions { max: 5 }
)))
)
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/eth/src/l1_messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn process_l1_message(
}
};

let res = mempool.accept_l1_handler_tx(transaction.into(), fees)?;
let res = mempool.tx_accept_l1_handler(transaction.into(), fees)?;

// TODO: remove unwraps
// Ques: shall it panic if no block number of event_index?
Expand Down
2 changes: 2 additions & 0 deletions crates/madara/client/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mc-db = { workspace = true, features = ["testing"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
proptest.workspace = true
proptest-derive.workspace = true
proptest-state-machine.workspace = true
bitvec.workspace = true
tracing = { workspace = true, features = ["log"] }
tracing-test.workspace = true
Expand All @@ -29,6 +30,7 @@ mockall.workspace = true
assert_matches.workspace = true
lazy_static.workspace = true
serde_json.workspace = true
starknet-types-core = { workspace = true, features = ["arbitrary"] }

[features]
testing = ["blockifier/testing", "mc-db/testing", "mockall"]
Expand Down
Loading