diff --git a/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json b/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json new file mode 100644 index 00000000..abd5fc53 --- /dev/null +++ b/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bc.bundle_id,\n bc.cost,\n bc.size,\n bc.da_block_height,\n bc.is_finalized,\n b.start_height,\n b.end_height\n FROM\n bundle_cost bc\n JOIN bundles b ON bc.bundle_id = b.id\n WHERE\n bc.is_finalized = TRUE\n ORDER BY\n b.start_height DESC\n LIMIT $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "cost", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "size", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "da_block_height", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "is_finalized", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "start_height", + "type_info": "Int8" + }, + { + "ordinal": 6, + "name": "end_height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c" +} diff --git a/.sqlx/query-126284fed623566f0551d4e6a343ddbd8800dd6c27165f89fc72970fe8a89147.json b/.sqlx/query-ddc1a18d0d257b9065830b46a10ce42fee96b0925eb2c30a0b98cf9f79c6ed76.json similarity index 58% rename from .sqlx/query-126284fed623566f0551d4e6a343ddbd8800dd6c27165f89fc72970fe8a89147.json rename to .sqlx/query-ddc1a18d0d257b9065830b46a10ce42fee96b0925eb2c30a0b98cf9f79c6ed76.json index 0b8b6451..86a298a0 100644 --- a/.sqlx/query-126284fed623566f0551d4e6a343ddbd8800dd6c27165f89fc72970fe8a89147.json +++ b/.sqlx/query-ddc1a18d0d257b9065830b46a10ce42fee96b0925eb2c30a0b98cf9f79c6ed76.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT f.*\n FROM l1_fragments f\n JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id\n JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n WHERE t.hash = $1\n ", + "query": "\n SELECT\n f.*,\n b.start_height\n FROM l1_fragments f\n JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id\n JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n JOIN bundles b ON b.id = f.bundle_id\n WHERE t.hash = $1\n ", "describe": { "columns": [ { @@ -32,6 +32,11 @@ "ordinal": 5, "name": "bundle_id", "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "start_height", + "type_info": "Int8" } ], "parameters": { @@ -45,8 +50,9 @@ false, false, false, + false, false ] }, - "hash": "126284fed623566f0551d4e6a343ddbd8800dd6c27165f89fc72970fe8a89147" + "hash": "ddc1a18d0d257b9065830b46a10ce42fee96b0925eb2c30a0b98cf9f79c6ed76" } diff --git a/.sqlx/query-11c3dc9c06523c39e928bfc1c2947309b2f92155b5d2198e39b42f687cc58f40.json b/.sqlx/query-ed56ffeb0264867943f7891de21ff99a2bfb27dd1e51d0f877f939e29b7f3a52.json similarity index 51% rename from .sqlx/query-11c3dc9c06523c39e928bfc1c2947309b2f92155b5d2198e39b42f687cc58f40.json rename to .sqlx/query-ed56ffeb0264867943f7891de21ff99a2bfb27dd1e51d0f877f939e29b7f3a52.json index 2fd79840..9fe76b57 100644 --- a/.sqlx/query-11c3dc9c06523c39e928bfc1c2947309b2f92155b5d2198e39b42f687cc58f40.json +++ b/.sqlx/query-ed56ffeb0264867943f7891de21ff99a2bfb27dd1e51d0f877f939e29b7f3a52.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n sub.id,\n sub.idx,\n sub.bundle_id,\n sub.data,\n sub.unused_bytes,\n sub.total_bytes\n FROM (\n SELECT DISTINCT ON (f.id)\n f.*,\n b.start_height\n FROM l1_fragments f\n JOIN bundles b ON b.id = f.bundle_id\n WHERE\n b.end_height >= $2\n AND NOT EXISTS (\n SELECT 1\n FROM l1_transaction_fragments tf\n JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n WHERE tf.fragment_id = f.id\n AND t.state <> $1\n )\n ORDER BY\n f.id,\n b.start_height ASC,\n f.idx ASC\n ) AS sub\n ORDER BY\n sub.start_height ASC,\n sub.idx ASC\n LIMIT $3;\n", + "query": "SELECT\n sub.id,\n sub.idx,\n sub.bundle_id,\n sub.data,\n sub.unused_bytes,\n sub.total_bytes,\n sub.start_height\n FROM (\n SELECT DISTINCT ON (f.id)\n f.*,\n b.start_height\n FROM l1_fragments f\n JOIN bundles b ON b.id = f.bundle_id\n WHERE\n b.end_height >= $2\n AND NOT EXISTS (\n SELECT 1\n FROM l1_transaction_fragments tf\n JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n WHERE tf.fragment_id = f.id\n AND t.state <> $1\n )\n ORDER BY\n f.id,\n b.start_height ASC,\n f.idx ASC\n ) AS sub\n ORDER BY\n sub.start_height ASC,\n sub.idx ASC\n LIMIT $3;\n", "describe": { "columns": [ { @@ -32,6 +32,11 @@ "ordinal": 5, "name": "total_bytes", "type_info": "Int8" + }, + { + "ordinal": 6, + "name": "start_height", + "type_info": "Int8" } ], "parameters": { @@ -47,8 +52,9 @@ false, false, false, + false, false ] }, - "hash": "11c3dc9c06523c39e928bfc1c2947309b2f92155b5d2198e39b42f687cc58f40" + "hash": "ed56ffeb0264867943f7891de21ff99a2bfb27dd1e51d0f877f939e29b7f3a52" } diff --git a/Cargo.lock b/Cargo.lock index 6b99e671..84196586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2146,6 +2146,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -2673,6 +2694,7 @@ dependencies = [ "serde", "serde_json", "services", + "static_assertions", "test-case", "thiserror 1.0.69", "tokio", @@ -5761,6 +5783,7 @@ dependencies = [ "async-trait", "bytesize", "clock", + "csv", "delegate", "eth", "fuel-block-committer-encoding", @@ -5773,6 +5796,7 @@ dependencies = [ "mockall", "nonempty", "pretty_assertions", + "proptest", "rand", "rayon", "serde", diff --git a/committer/src/api.rs b/committer/src/api.rs index cd97a59c..5393ba0a 100644 --- a/committer/src/api.rs +++ b/committer/src/api.rs @@ -90,9 +90,17 @@ async fn metrics(registry: web::Data>) -> impl Responder { std::result::Result::<_, InternalError<_>>::Ok(text) } +#[derive(Deserialize)] +#[serde(rename_all = "lowercase")] +enum HeightVariant { + Latest, + Specific, +} + #[derive(Deserialize)] struct CostQueryParams { - from_height: u32, + variant: HeightVariant, + value: Option, limit: Option, } @@ -103,8 +111,18 @@ async fn costs( ) -> impl Responder { let limit = query.limit.unwrap_or(100); - match data.get_costs(query.from_height, limit).await { - Ok(bundle_costs) => HttpResponse::Ok().json(bundle_costs), + let response = match query.variant { + HeightVariant::Latest => data.get_latest_costs(limit).await, + HeightVariant::Specific => match query.value { + Some(height) => data.get_costs(height, limit).await, + None => Err(services::Error::Other( + "height value is required".to_string(), + )), + }, + }; + + match response { + Ok(costs) => HttpResponse::Ok().json(costs), Err(services::Error::Other(e)) => { HttpResponse::from_error(InternalError::new(e, StatusCode::BAD_REQUEST)) } diff --git a/committer/src/config.rs b/committer/src/config.rs index fbc6cf6b..eca2d76e 100644 --- a/committer/src/config.rs +++ b/committer/src/config.rs @@ -1,6 +1,6 @@ use std::{ net::Ipv4Addr, - num::{NonZeroU32, NonZeroUsize}, + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, str::FromStr, time::Duration, }; @@ -93,6 +93,9 @@ pub struct App { /// How often to check for finalized l1 txs #[serde(deserialize_with = "human_readable_duration")] pub tx_finalization_check_interval: Duration, + /// How often to check for l1 prices + #[serde(deserialize_with = "human_readable_duration")] + pub l1_prices_check_interval: Duration, /// Number of L1 blocks that need to pass to accept the tx as finalized pub num_blocks_to_finalize_tx: u64, /// Interval after which to bump a pending tx @@ -111,6 +114,30 @@ pub struct App { /// How often to run state pruner #[serde(deserialize_with = "human_readable_duration")] pub state_pruner_run_interval: Duration, + /// Configuration for the fee algorithm used by the StateCommitter + pub fee_algo: FeeAlgoConfig, +} + +/// Configuration for the fee algorithm used by the StateCommitter +#[derive(Debug, Clone, Deserialize)] +pub struct FeeAlgoConfig { + /// Short-term period for Simple Moving Average (SMA) in block numbers + pub short_sma_blocks: NonZeroU64, + + /// Long-term period for Simple Moving Average (SMA) in block numbers + pub long_sma_blocks: NonZeroU64, + + /// Maximum number of unposted L2 blocks before sending a transaction regardless of fees + pub max_l2_blocks_behind: NonZeroU32, + + /// Starting discount percentage applied we try to achieve if we're 0 l2 blocks behind + pub start_discount_percentage: f64, + + /// Premium percentage we're willing to pay if we're max_l2_blocks_behind - 1 blocks behind + pub end_premium_percentage: f64, + + /// A fee that is always acceptable regardless of other conditions + pub always_acceptable_fee: u64, } /// Configuration settings for managing fuel block bundling and fragment submission operations. diff --git a/committer/src/main.rs b/committer/src/main.rs index 9b9bf2d6..202ecece 100644 --- a/committer/src/main.rs +++ b/committer/src/main.rs @@ -7,6 +7,7 @@ mod setup; use api::launch_api_server; use errors::{Result, WithContext}; use metrics::prometheus::Registry; +use services::fee_tracker::port::cache::CachingApi; use setup::last_finalization_metric; use tokio_util::sync::CancellationToken; @@ -72,13 +73,22 @@ async fn main() -> Result<()> { &metrics_registry, ); + let (fee_tracker, fee_tracker_handle) = setup::fee_tracker( + ethereum_rpc.clone(), + cancel_token.clone(), + &config, + &metrics_registry, + )?; + let state_committer_handle = setup::state_committer( fuel_adapter.clone(), ethereum_rpc.clone(), storage.clone(), cancel_token.clone(), &config, - ); + &metrics_registry, + fee_tracker, + )?; let state_importer_handle = setup::block_importer(fuel_adapter, storage.clone(), cancel_token.clone(), &config); @@ -105,6 +115,7 @@ async fn main() -> Result<()> { handles.push(state_importer_handle); handles.push(block_bundler); handles.push(state_listener_handle); + handles.push(fee_tracker_handle); // Enable pruner once the issue is resolved //TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173 // handles.push(state_pruner_handle); diff --git a/committer/src/setup.rs b/committer/src/setup.rs index 3101e8fe..8e2fcd18 100644 --- a/committer/src/setup.rs +++ b/committer/src/setup.rs @@ -9,6 +9,10 @@ use metrics::{ }; use services::{ block_committer::{port::l1::Contract, service::BlockCommitter}, + fee_tracker::{ + port::cache::CachingApi, + service::{FeeThresholds, FeeTracker, SmaPeriods}, + }, state_committer::port::Storage, state_listener::service::StateListener, state_pruner::service::StatePruner, @@ -117,7 +121,9 @@ pub fn state_committer( storage: Database, cancel_token: CancellationToken, config: &config::Config, -) -> tokio::task::JoinHandle<()> { + registry: &Registry, + fee_tracker: FeeTracker>, +) -> Result> { let state_committer = services::StateCommitter::new( l1, fuel, @@ -127,17 +133,19 @@ pub fn state_committer( fragment_accumulation_timeout: config.app.bundle.fragment_accumulation_timeout, fragments_to_accumulate: config.app.bundle.fragments_to_accumulate, gas_bump_timeout: config.app.gas_bump_timeout, - tx_max_fee: config.app.tx_max_fee as u128, }, SystemClock, + fee_tracker, ); - schedule_polling( + state_committer.register_metrics(registry); + + Ok(schedule_polling( config.app.tx_finalization_check_interval, state_committer, "State Committer", cancel_token, - ) + )) } pub fn block_importer( @@ -316,3 +324,41 @@ pub async fn shut_down( storage.close().await; Ok(()) } + +pub fn fee_tracker( + l1: L1, + cancel_token: CancellationToken, + config: &config::Config, + registry: &Registry, +) -> Result<(FeeTracker>, tokio::task::JoinHandle<()>)> { + let fee_tracker = FeeTracker::new( + CachingApi::new(l1, 24 * 3600 / 12), + services::fee_tracker::service::Config { + sma_periods: SmaPeriods { + short: config.app.fee_algo.short_sma_blocks, + long: config.app.fee_algo.long_sma_blocks, + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: config.app.fee_algo.max_l2_blocks_behind, + start_discount_percentage: config + .app + .fee_algo + .start_discount_percentage + .try_into()?, + end_premium_percentage: config.app.fee_algo.end_premium_percentage.try_into()?, + always_acceptable_fee: config.app.fee_algo.always_acceptable_fee as u128, + }, + }, + ); + + fee_tracker.register_metrics(registry); + + let handle = schedule_polling( + config.app.tx_finalization_check_interval, + fee_tracker.clone(), + "Fee Tracker", + cancel_token, + ); + + Ok((fee_tracker, handle)) +} diff --git a/e2e/src/committer.rs b/e2e/src/committer.rs index ada77606..e6a6f171 100644 --- a/e2e/src/committer.rs +++ b/e2e/src/committer.rs @@ -73,6 +73,7 @@ impl Committer { .env("COMMITTER__APP__HOST", "127.0.0.1") .env("COMMITTER__APP__BLOCK_CHECK_INTERVAL", "5s") .env("COMMITTER__APP__TX_FINALIZATION_CHECK_INTERVAL", "5s") + .env("COMMITTER__APP__L1_PRICES_CHECK_INTERVAL", "5s") .env("COMMITTER__APP__NUM_BLOCKS_TO_FINALIZE_TX", "3") .env("COMMITTER__APP__GAS_BUMP_TIMEOUT", "300s") .env("COMMITTER__APP__TX_MAX_FEE", "4000000000000000") @@ -121,6 +122,16 @@ impl Committer { "COMMITTER__APP__STATE_PRUNER_RUN_INTERVAL", get_field!(state_pruner_run_interval), ) + .env("COMMITTER__APP__FEE_ALGO__SHORT_SMA_BLOCKS", "1") + .env("COMMITTER__APP__FEE_ALGO__LONG_SMA_BLOCKS", "1") + .env("COMMITTER__APP__FEE_ALGO__MAX_L2_BLOCKS_BEHIND", "1") + .env("COMMITTER__APP__FEE_ALGO__START_DISCOUNT_PERCENTAGE", "0") + .env("COMMITTER__APP__FEE_ALGO__END_PREMIUM_PERCENTAGE", "0") + // we're basically disabling the fee algo here + .env( + "COMMITTER__APP__FEE_ALGO__ALWAYS_ACCEPTABLE_FEE", + u64::MAX.to_string(), + ) .current_dir(Path::new(env!("CARGO_MANIFEST_DIR")).parent().unwrap()) .kill_on_drop(true); diff --git a/packages/adapters/eth/Cargo.toml b/packages/adapters/eth/Cargo.toml index 6c18d2c9..0ab31d3b 100644 --- a/packages/adapters/eth/Cargo.toml +++ b/packages/adapters/eth/Cargo.toml @@ -21,6 +21,7 @@ alloy = { workspace = true, features = [ "rpc-types", "reqwest-rustls-tls", ] } +static_assertions = { workspace = true } async-trait = { workspace = true } aws-config = { workspace = true, features = ["default"] } aws-sdk-kms = { workspace = true, features = ["default"] } diff --git a/packages/adapters/eth/src/fee_conversion.rs b/packages/adapters/eth/src/fee_conversion.rs new file mode 100644 index 00000000..0d34073c --- /dev/null +++ b/packages/adapters/eth/src/fee_conversion.rs @@ -0,0 +1,531 @@ +use std::ops::RangeInclusive; + +use alloy::rpc::types::FeeHistory; +use itertools::{izip, Itertools}; +use services::{ + fee_tracker::port::l1::{BlockFees, Fees}, + Result, +}; + +pub fn unpack_fee_history(fees: FeeHistory) -> Result> { + let number_of_blocks = if fees.base_fee_per_gas.is_empty() { + 0 + } else { + // We subtract 1 because the last element is the expected fee for the next block + fees.base_fee_per_gas + .len() + .checked_sub(1) + .expect("checked not 0") + }; + + if number_of_blocks == 0 { + return Ok(vec![]); + } + + let Some(nested_rewards) = fees.reward.as_ref() else { + return Err(services::Error::Other(format!( + "missing rewards field: {fees:?}" + ))); + }; + + if number_of_blocks != nested_rewards.len() + || number_of_blocks != fees.base_fee_per_blob_gas.len() - 1 + { + return Err(services::Error::Other(format!( + "discrepancy in lengths of fee fields: {fees:?}" + ))); + } + + let rewards: Vec<_> = nested_rewards + .iter() + .map(|perc| { + perc.last().copied().ok_or_else(|| { + crate::error::Error::Other( + "should have had at least one reward percentile".to_string(), + ) + }) + }) + .try_collect()?; + + let values = izip!( + (fees.oldest_block..), + fees.base_fee_per_gas.into_iter(), + fees.base_fee_per_blob_gas.into_iter(), + rewards + ) + .take(number_of_blocks) + .map( + |(height, base_fee_per_gas, base_fee_per_blob_gas, reward)| BlockFees { + height, + fees: Fees { + base_fee_per_gas, + reward, + base_fee_per_blob_gas, + }, + }, + ) + .collect(); + + Ok(values) +} + +pub fn chunk_range_inclusive( + initial_range: RangeInclusive, + chunk_size: u64, +) -> Vec> { + let mut ranges = Vec::new(); + + if chunk_size == 0 { + return ranges; + } + + let start = *initial_range.start(); + let end = *initial_range.end(); + + let mut current = start; + while current <= end { + // Calculate the end of the current chunk. + let chunk_end = (current + chunk_size - 1).min(end); + + ranges.push(current..=chunk_end); + + current = chunk_end + 1; + } + + ranges +} + +#[cfg(test)] +mod test { + use alloy::rpc::types::FeeHistory; + use services::fee_tracker::port::l1::{BlockFees, Fees}; + + use std::ops::RangeInclusive; + + use crate::fee_conversion::{self}; + + #[test] + fn test_chunk_size_zero() { + // given + let initial_range = 1..=10; + let chunk_size = 0; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected: Vec> = vec![]; + assert_eq!( + result, expected, + "Expected empty vector when chunk_size is zero" + ); + } + + #[test] + fn test_chunk_size_larger_than_range() { + // given + let initial_range = 1..=5; + let chunk_size = 10; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![1..=5]; + assert_eq!( + result, expected, + "Expected single chunk when chunk_size exceeds range length" + ); + } + + #[test] + fn test_exact_multiples() { + // given + let initial_range = 1..=10; + let chunk_size = 2; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![1..=2, 3..=4, 5..=6, 7..=8, 9..=10]; + assert_eq!(result, expected, "Chunks should exactly divide the range"); + } + + #[test] + fn test_non_exact_multiples() { + // given + let initial_range = 1..=10; + let chunk_size = 3; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![1..=3, 4..=6, 7..=9, 10..=10]; + assert_eq!( + result, expected, + "Last chunk should contain the remaining elements" + ); + } + + #[test] + fn test_single_element_range() { + // given + let initial_range = 5..=5; + let chunk_size = 1; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![5..=5]; + assert_eq!( + result, expected, + "Single element range should return one chunk with that element" + ); + } + + #[test] + fn test_start_equals_end_with_large_chunk_size() { + // given + let initial_range = 100..=100; + let chunk_size = 50; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![100..=100]; + assert_eq!( + result, expected, + "Single element range should return one chunk regardless of chunk_size" + ); + } + + #[test] + fn test_chunk_size_one() { + // given + let initial_range = 10..=15; + let chunk_size = 1; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![10..=10, 11..=11, 12..=12, 13..=13, 14..=14, 15..=15]; + assert_eq!( + result, expected, + "Each number should be its own chunk when chunk_size is one" + ); + } + + #[test] + fn test_full_range_chunk() { + // given + let initial_range = 20..=30; + let chunk_size = 11; + + // when + let result = fee_conversion::chunk_range_inclusive(initial_range, chunk_size); + + // then + let expected = vec![20..=30]; + assert_eq!( + result, expected, + "Whole range should be a single chunk when chunk_size equals range size" + ); + } + + #[test] + fn test_unpack_fee_history_empty_base_fee() { + // given + let fees = FeeHistory { + oldest_block: 100, + base_fee_per_gas: vec![], + base_fee_per_blob_gas: vec![], + reward: Some(vec![]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees); + + // then + let expected: Vec = vec![]; + assert_eq!( + result.unwrap(), + expected, + "Expected empty vector when base_fee_per_gas is empty" + ); + } + + #[test] + fn test_unpack_fee_history_missing_rewards() { + // given + let fees = FeeHistory { + oldest_block: 200, + base_fee_per_gas: vec![100, 200], + base_fee_per_blob_gas: vec![150, 250], + reward: None, + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees.clone()); + + // then + let expected_error = services::Error::Other(format!("missing rewards field: {:?}", fees)); + assert_eq!( + result.unwrap_err(), + expected_error, + "Expected error due to missing rewards field" + ); + } + + #[test] + fn test_unpack_fee_history_discrepancy_in_lengths_base_fee_rewards() { + // given + let fees = FeeHistory { + oldest_block: 300, + base_fee_per_gas: vec![100, 200, 300], + base_fee_per_blob_gas: vec![150, 250, 350], + reward: Some(vec![vec![10]]), // Should have 2 rewards for 2 blocks + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees.clone()); + + // then + let expected_error = + services::Error::Other(format!("discrepancy in lengths of fee fields: {:?}", fees)); + assert_eq!( + result.unwrap_err(), + expected_error, + "Expected error due to discrepancy in lengths of fee fields" + ); + } + + #[test] + fn test_unpack_fee_history_discrepancy_in_lengths_blob_gas() { + // given + let fees = FeeHistory { + oldest_block: 400, + base_fee_per_gas: vec![100, 200, 300], + base_fee_per_blob_gas: vec![150, 250], // Should have 3 elements + reward: Some(vec![vec![10], vec![20]]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees.clone()); + + // then + let expected_error = + services::Error::Other(format!("discrepancy in lengths of fee fields: {:?}", fees)); + assert_eq!( + result.unwrap_err(), + expected_error, + "Expected error due to discrepancy in base_fee_per_blob_gas lengths" + ); + } + + #[test] + fn test_unpack_fee_history_empty_reward_percentile() { + // given + let fees = FeeHistory { + oldest_block: 500, + base_fee_per_gas: vec![100, 200], + base_fee_per_blob_gas: vec![150, 250], + reward: Some(vec![vec![]]), // Empty percentile + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees.clone()); + + // then + let expected_error = + services::Error::Other("should have had at least one reward percentile".to_string()); + assert_eq!( + result.unwrap_err(), + expected_error, + "Expected error due to empty reward percentile" + ); + } + + #[test] + fn test_unpack_fee_history_single_block() { + // given + let fees = FeeHistory { + oldest_block: 600, + base_fee_per_gas: vec![100, 200], // number_of_blocks =1 + base_fee_per_blob_gas: vec![150, 250], + reward: Some(vec![vec![10]]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees); + + // then + let expected = vec![BlockFees { + height: 600, + fees: Fees { + base_fee_per_gas: 100, + reward: 10, + base_fee_per_blob_gas: 150, + }, + }]; + assert_eq!( + result.unwrap(), + expected, + "Expected one BlockFees entry for a single block" + ); + } + + #[test] + fn test_unpack_fee_history_multiple_blocks() { + // given + let fees = FeeHistory { + oldest_block: 700, + base_fee_per_gas: vec![100, 200, 300, 400], // number_of_blocks =3 + base_fee_per_blob_gas: vec![150, 250, 350, 450], + reward: Some(vec![vec![10], vec![20], vec![30]]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees); + + // then + let expected = vec![ + BlockFees { + height: 700, + fees: Fees { + base_fee_per_gas: 100, + reward: 10, + base_fee_per_blob_gas: 150, + }, + }, + BlockFees { + height: 701, + fees: Fees { + base_fee_per_gas: 200, + reward: 20, + base_fee_per_blob_gas: 250, + }, + }, + BlockFees { + height: 702, + fees: Fees { + base_fee_per_gas: 300, + reward: 30, + base_fee_per_blob_gas: 350, + }, + }, + ]; + assert_eq!( + result.unwrap(), + expected, + "Expected three BlockFees entries for three blocks" + ); + } + + #[test] + fn test_unpack_fee_history_large_values() { + // given + let fees = FeeHistory { + oldest_block: u64::MAX - 2, + base_fee_per_gas: vec![u128::MAX - 2, u128::MAX - 1, u128::MAX], + base_fee_per_blob_gas: vec![u128::MAX - 3, u128::MAX - 2, u128::MAX - 1], + reward: Some(vec![vec![u128::MAX - 4], vec![u128::MAX - 3]]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees.clone()); + + // then + let expected = vec![ + BlockFees { + height: u64::MAX - 2, + fees: Fees { + base_fee_per_gas: u128::MAX - 2, + reward: u128::MAX - 4, + base_fee_per_blob_gas: u128::MAX - 3, + }, + }, + BlockFees { + height: u64::MAX - 1, + fees: Fees { + base_fee_per_gas: u128::MAX - 1, + reward: u128::MAX - 3, + base_fee_per_blob_gas: u128::MAX - 2, + }, + }, + ]; + assert_eq!( + result.unwrap(), + expected, + "Expected BlockFees entries with large u64 values" + ); + } + + #[test] + fn test_unpack_fee_history_full_range_chunk() { + // given + let fees = FeeHistory { + oldest_block: 800, + base_fee_per_gas: vec![500, 600, 700, 800, 900], // number_of_blocks =4 + base_fee_per_blob_gas: vec![550, 650, 750, 850, 950], + reward: Some(vec![vec![50], vec![60], vec![70], vec![80]]), + ..Default::default() + }; + + // when + let result = fee_conversion::unpack_fee_history(fees); + + // then + let expected = vec![ + BlockFees { + height: 800, + fees: Fees { + base_fee_per_gas: 500, + reward: 50, + base_fee_per_blob_gas: 550, + }, + }, + BlockFees { + height: 801, + fees: Fees { + base_fee_per_gas: 600, + reward: 60, + base_fee_per_blob_gas: 650, + }, + }, + BlockFees { + height: 802, + fees: Fees { + base_fee_per_gas: 700, + reward: 70, + base_fee_per_blob_gas: 750, + }, + }, + BlockFees { + height: 803, + fees: Fees { + base_fee_per_gas: 800, + reward: 80, + base_fee_per_blob_gas: 850, + }, + }, + ]; + assert_eq!( + result.unwrap(), + expected, + "Expected BlockFees entries matching the full range chunk" + ); + } +} diff --git a/packages/adapters/eth/src/lib.rs b/packages/adapters/eth/src/lib.rs index 38e069fc..c3a27c29 100644 --- a/packages/adapters/eth/src/lib.rs +++ b/packages/adapters/eth/src/lib.rs @@ -1,13 +1,20 @@ -use std::num::{NonZeroU32, NonZeroUsize}; +use std::{ + num::{NonZeroU32, NonZeroUsize}, + ops::RangeInclusive, + time::Duration, +}; use alloy::{ consensus::BlobTransactionSidecar, eips::eip4844::{BYTES_PER_BLOB, DATA_GAS_PER_BLOB}, primitives::U256, + rpc::types::FeeHistory, }; use delegate::delegate; +use futures::{stream, StreamExt, TryStreamExt}; use itertools::{izip, Itertools}; use services::{ + fee_tracker::port::l1::SequentialBlockFees, types::{ BlockSubmissionTx, Fragment, FragmentsSubmitted, L1Height, L1Tx, NonEmpty, NonNegative, TransactionResponse, @@ -17,17 +24,42 @@ use services::{ mod aws; mod error; +mod fee_conversion; mod metrics; mod websocket; pub use alloy::primitives::Address; pub use aws::*; use fuel_block_committer_encoding::blob::{self, generate_sidecar}; +use static_assertions::const_assert; pub use websocket::{L1Key, L1Keys, Signer, Signers, TxConfig, WebsocketClient}; #[derive(Debug, Copy, Clone)] pub struct BlobEncoder; +pub async fn make_pub_eth_client() -> WebsocketClient { + let signers = Signers::for_keys(crate::L1Keys { + main: crate::L1Key::Private( + "98d88144512cc5747fed20bdc81fb820c4785f7411bd65a88526f3b084dc931e".to_string(), + ), + blob: None, + }) + .await + .unwrap(); + + crate::WebsocketClient::connect( + "wss://ethereum-rpc.publicnode.com".parse().unwrap(), + Default::default(), + signers, + 10, + crate::TxConfig { + tx_max_fee: u128::MAX, + send_tx_request_timeout: Duration::MAX, + }, + ) + .await + .unwrap() +} impl BlobEncoder { #[cfg(feature = "test-helpers")] pub const FRAGMENT_SIZE: usize = BYTES_PER_BLOB; @@ -174,7 +206,44 @@ impl services::block_committer::port::l1::Api for WebsocketClient { } } +impl services::fee_tracker::port::l1::Api for WebsocketClient { + async fn current_height(&self) -> Result { + self._get_block_number().await + } + + async fn fees(&self, height_range: RangeInclusive) -> Result { + const REWARD_PERCENTILE: f64 = + alloy::providers::utils::EIP1559_FEE_ESTIMATION_REWARD_PERCENTILE; + // so that a alloy version bump doesn't surprise us + const_assert!(REWARD_PERCENTILE == 20.0,); + + // There is a comment in alloy about not doing more than 1024 blocks at a time + const RPC_LIMIT: u64 = 1024; + + let fees: Vec = stream::iter(fee_conversion::chunk_range_inclusive( + height_range, + RPC_LIMIT, + )) + .then(|range| self.fees(range, std::slice::from_ref(&REWARD_PERCENTILE))) + .try_collect() + .await?; + + let mut unpacked_fees = vec![]; + for fee in fees { + unpacked_fees.extend(fee_conversion::unpack_fee_history(fee)?); + } + + unpacked_fees + .try_into() + .map_err(|e| services::Error::Other(format!("{e}"))) + } +} + impl services::state_committer::port::l1::Api for WebsocketClient { + async fn current_height(&self) -> Result { + self._get_block_number().await + } + delegate! { to (*self) { async fn submit_state_fragments( diff --git a/packages/adapters/eth/src/websocket.rs b/packages/adapters/eth/src/websocket.rs index 9ccd8091..8b64038b 100644 --- a/packages/adapters/eth/src/websocket.rs +++ b/packages/adapters/eth/src/websocket.rs @@ -1,10 +1,11 @@ -use std::{num::NonZeroU32, str::FromStr, time::Duration}; +use std::{num::NonZeroU32, ops::RangeInclusive, str::FromStr, time::Duration}; use ::metrics::{prometheus::core::Collector, HealthChecker, RegistersMetrics}; use alloy::{ consensus::SignableTransaction, network::TxSigner, primitives::{Address, ChainId, B256}, + rpc::types::FeeHistory, signers::{local::PrivateKeySigner, Signature}, }; use serde::Deserialize; @@ -225,6 +226,14 @@ impl WebsocketClient { Ok(self.inner.get_transaction_response(tx_hash).await?) } + pub(crate) async fn fees( + &self, + height_range: RangeInclusive, + rewards_percentile: &[f64], + ) -> Result { + Ok(self.inner.fees(height_range, rewards_percentile).await?) + } + pub(crate) async fn is_squeezed_out(&self, tx_hash: [u8; 32]) -> Result { Ok(self.inner.is_squeezed_out(tx_hash).await?) } diff --git a/packages/adapters/eth/src/websocket/connection.rs b/packages/adapters/eth/src/websocket/connection.rs index 8a718b9f..f6729d21 100644 --- a/packages/adapters/eth/src/websocket/connection.rs +++ b/packages/adapters/eth/src/websocket/connection.rs @@ -1,6 +1,7 @@ use std::{ cmp::{max, min}, num::NonZeroU32, + ops::RangeInclusive, time::Duration, }; @@ -14,7 +15,7 @@ use alloy::{ primitives::{Address, U256}, providers::{utils::Eip1559Estimation, Provider, ProviderBuilder, SendableTx, WsConnect}, pubsub::PubSubFrontend, - rpc::types::{TransactionReceipt, TransactionRequest}, + rpc::types::{FeeHistory, TransactionReceipt, TransactionRequest}, sol, }; use itertools::Itertools; @@ -212,6 +213,19 @@ impl EthApi for WsConnection { Ok(submission_tx) } + async fn fees( + &self, + height_range: RangeInclusive, + reward_percentiles: &[f64], + ) -> Result { + let max = *height_range.end(); + let count = height_range.clone().count() as u64; + Ok(self + .provider + .get_fee_history(count, BlockNumberOrTag::Number(max), reward_percentiles) + .await?) + } + async fn get_block_number(&self) -> Result { let response = self.provider.get_block_number().await?; Ok(response) diff --git a/packages/adapters/eth/src/websocket/health_tracking_middleware.rs b/packages/adapters/eth/src/websocket/health_tracking_middleware.rs index 21f5c3aa..93045485 100644 --- a/packages/adapters/eth/src/websocket/health_tracking_middleware.rs +++ b/packages/adapters/eth/src/websocket/health_tracking_middleware.rs @@ -1,8 +1,9 @@ -use std::num::NonZeroU32; +use std::{num::NonZeroU32, ops::RangeInclusive}; use ::metrics::{ prometheus::core::Collector, ConnectionHealthTracker, HealthChecker, RegistersMetrics, }; +use alloy::rpc::types::FeeHistory; use delegate::delegate; use services::types::{Address, BlockSubmissionTx, Fragment, NonEmpty, TransactionResponse, U256}; @@ -15,6 +16,11 @@ use crate::{ #[async_trait::async_trait] pub trait EthApi { async fn submit(&self, hash: [u8; 32], height: u32) -> Result; + async fn fees( + &self, + height_range: RangeInclusive, + reward_percentiles: &[f64], + ) -> Result; async fn get_block_number(&self) -> Result; async fn balance(&self, address: Address) -> Result; fn commit_interval(&self) -> NonZeroU32; @@ -117,6 +123,16 @@ where response } + async fn fees( + &self, + height_range: RangeInclusive, + reward_percentiles: &[f64], + ) -> Result { + let response = self.adapter.fees(height_range, reward_percentiles).await; + self.note_network_status(&response); + response + } + async fn is_squeezed_out(&self, tx_hash: [u8; 32]) -> Result { let response = self.adapter.is_squeezed_out(tx_hash).await; self.note_network_status(&response); diff --git a/packages/adapters/storage/src/lib.rs b/packages/adapters/storage/src/lib.rs index 8ea34bbc..6f9360d4 100644 --- a/packages/adapters/storage/src/lib.rs +++ b/packages/adapters/storage/src/lib.rs @@ -50,6 +50,10 @@ impl services::cost_reporter::port::Storage for Postgres { .await .map_err(Into::into) } + + async fn get_latest_costs(&self, limit: usize) -> Result> { + self._get_latest_costs(limit).await.map_err(Into::into) + } } impl services::status_reporter::port::Storage for Postgres { @@ -1163,4 +1167,38 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn get_latest_finalized_costs() -> Result<()> { + use services::cost_reporter::port::Storage; + + // given + let storage = start_db().await; + + for i in 0..5 { + let start_height = i * 10 + 1; + let end_height = start_height + 9; + let block_range = start_height..=end_height; + + ensure_finalized_fragments_exist_in_the_db( + storage.clone(), + block_range, + 1000u128, + 5000u64, + ) + .await; + } + + // when + let finalized_costs = storage.get_latest_costs(1).await?; + + // then + assert_eq!(finalized_costs.len(), 1); + let finalized_cost = &finalized_costs[0]; + + assert_eq!(finalized_cost.start_height, 41); + assert_eq!(finalized_cost.end_height, 50); + + Ok(()) + } } diff --git a/packages/adapters/storage/src/mappings/tables.rs b/packages/adapters/storage/src/mappings/tables.rs index d93fbee7..626d431a 100644 --- a/packages/adapters/storage/src/mappings/tables.rs +++ b/packages/adapters/storage/src/mappings/tables.rs @@ -193,6 +193,7 @@ pub struct BundleFragment { pub data: Vec, pub unused_bytes: i64, pub total_bytes: i64, + pub start_height: i64, } impl TryFrom for services::types::storage::BundleFragment { @@ -261,10 +262,18 @@ impl TryFrom for services::types::storage::BundleFragment { total_bytes, }; + let start_height = value.start_height.try_into().map_err(|e| { + crate::error::Error::Conversion(format!( + "Invalid db `start_height` ({}). Reason: {e}", + value.start_height + )) + })?; + Ok(Self { id, - idx, bundle_id, + idx, + oldest_block_in_bundle: start_height, fragment, }) } diff --git a/packages/adapters/storage/src/postgres.rs b/packages/adapters/storage/src/postgres.rs index f933703a..5401debe 100644 --- a/packages/adapters/storage/src/postgres.rs +++ b/packages/adapters/storage/src/postgres.rs @@ -277,7 +277,8 @@ impl Postgres { sub.bundle_id, sub.data, sub.unused_bytes, - sub.total_bytes + sub.total_bytes, + sub.start_height FROM ( SELECT DISTINCT ON (f.id) f.*, @@ -323,11 +324,14 @@ impl Postgres { let fragments = sqlx::query_as!( tables::BundleFragment, r#" - SELECT f.* - FROM l1_fragments f - JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id - JOIN l1_blob_transaction t ON t.id = tf.transaction_id - WHERE t.hash = $1 + SELECT + f.*, + b.start_height + FROM l1_fragments f + JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id + JOIN l1_blob_transaction t ON t.id = tf.transaction_id + JOIN bundles b ON b.id = f.bundle_id + WHERE t.hash = $1 "#, tx_hash.as_slice() ) @@ -861,6 +865,36 @@ impl Postgres { .collect::>>() } + pub(crate) async fn _get_latest_costs(&self, limit: usize) -> Result> { + sqlx::query_as!( + tables::BundleCost, + r#" + SELECT + bc.bundle_id, + bc.cost, + bc.size, + bc.da_block_height, + bc.is_finalized, + b.start_height, + b.end_height + FROM + bundle_cost bc + JOIN bundles b ON bc.bundle_id = b.id + WHERE + bc.is_finalized = TRUE + ORDER BY + b.start_height DESC + LIMIT $1 + "#, + limit as i64 + ) + .fetch_all(&self.connection_pool) + .await? + .into_iter() + .map(BundleCost::try_from) + .collect::>>() + } + pub(crate) async fn _next_bundle_id(&self) -> Result> { let next_id = sqlx::query!("SELECT nextval(pg_get_serial_sequence('bundles', 'id'))") .fetch_one(&self.connection_pool) diff --git a/packages/adapters/storage/src/test_instance.rs b/packages/adapters/storage/src/test_instance.rs index b4baa4ea..824df64c 100644 --- a/packages/adapters/storage/src/test_instance.rs +++ b/packages/adapters/storage/src/test_instance.rs @@ -1,9 +1,3 @@ -use std::{ - borrow::Cow, - ops::RangeInclusive, - sync::{Arc, Weak}, -}; - use delegate::delegate; use services::{ block_bundler, block_committer, block_importer, @@ -14,6 +8,11 @@ use services::{ }, }; use sqlx::Executor; +use std::{ + borrow::Cow, + ops::RangeInclusive, + sync::{Arc, Weak}, +}; use testcontainers::{ core::{ContainerPort, WaitFor}, runners::AsyncRunner, @@ -351,4 +350,8 @@ impl services::cost_reporter::port::Storage for DbWithProcess { .await .map_err(Into::into) } + + async fn get_latest_costs(&self, limit: usize) -> services::Result> { + self.db._get_latest_costs(limit).await.map_err(Into::into) + } } diff --git a/packages/services/Cargo.toml b/packages/services/Cargo.toml index d23a2313..644c6b78 100644 --- a/packages/services/Cargo.toml +++ b/packages/services/Cargo.toml @@ -46,6 +46,8 @@ tai64 = { workspace = true } tokio = { workspace = true, features = ["macros"] } test-helpers = { workspace = true } rand = { workspace = true, features = ["small_rng", "std", "std_rng"] } +csv = "1.3" +proptest = { workspace = true, features = ["default"] } [features] test-helpers = ["dep:mockall", "dep:rand"] diff --git a/packages/services/src/cost_reporter.rs b/packages/services/src/cost_reporter.rs index b564c879..600b699e 100644 --- a/packages/services/src/cost_reporter.rs +++ b/packages/services/src/cost_reporter.rs @@ -36,6 +36,17 @@ pub mod service { .get_finalized_costs(from_block_height, limit) .await } + + pub async fn get_latest_costs(&self, limit: usize) -> Result> { + if limit > self.request_limit { + return Err(Error::Other(format!( + "requested: {} items, but limit is: {}", + limit, self.request_limit + ))); + } + + self.storage.get_latest_costs(limit).await + } } } @@ -50,5 +61,7 @@ pub mod port { from_block_height: u32, limit: usize, ) -> Result>; + + async fn get_latest_costs(&self, limit: usize) -> Result>; } } diff --git a/packages/services/src/fee_tracker.rs b/packages/services/src/fee_tracker.rs new file mode 100644 index 00000000..a977c59c --- /dev/null +++ b/packages/services/src/fee_tracker.rs @@ -0,0 +1,4 @@ +pub mod port; +pub mod service; + +mod fee_analytics; diff --git a/packages/services/src/fee_tracker/fee_analytics.rs b/packages/services/src/fee_tracker/fee_analytics.rs new file mode 100644 index 00000000..1f0d8144 --- /dev/null +++ b/packages/services/src/fee_tracker/fee_analytics.rs @@ -0,0 +1,389 @@ +use std::ops::RangeInclusive; + +use crate::Error; + +use super::port::l1::{Api, BlockFees, Fees, SequentialBlockFees}; + +#[derive(Debug, Clone)] +pub struct FeeAnalytics

{ + fees_provider: P, +} + +impl

FeeAnalytics

{ + pub fn new(fees_provider: P) -> Self { + Self { fees_provider } + } +} + +impl FeeAnalytics

{ + pub async fn calculate_sma(&self, block_range: RangeInclusive) -> crate::Result { + let fees = self.fees_provider.fees(block_range.clone()).await?; + + let received_height_range = fees.height_range(); + if received_height_range != block_range { + return Err(Error::from(format!( + "fees received from the adapter({received_height_range:?}) don't cover the requested range ({block_range:?})" + ))); + } + + Ok(Self::mean(fees)) + } + + pub async fn latest_fees(&self) -> crate::Result { + let height = self.fees_provider.current_height().await?; + + let fee = self + .fees_provider + .fees(height..=height) + .await? + .into_iter() + .next() + .expect("sequential fees guaranteed not empty"); + + Ok(fee) + } + + fn mean(fees: SequentialBlockFees) -> Fees { + let count = fees.len() as u128; + + let total = fees + .into_iter() + .map(|bf| bf.fees) + .fold(Fees::default(), |acc, f| Fees { + base_fee_per_gas: acc.base_fee_per_gas + f.base_fee_per_gas, + reward: acc.reward + f.reward, + base_fee_per_blob_gas: acc.base_fee_per_blob_gas + f.base_fee_per_blob_gas, + }); + + // TODO: segfault should we round to nearest here? + Fees { + base_fee_per_gas: total.base_fee_per_gas.saturating_div(count), + reward: total.reward.saturating_div(count), + base_fee_per_blob_gas: total.base_fee_per_blob_gas.saturating_div(count), + } + } +} + +#[cfg(test)] +mod tests { + use itertools::Itertools; + + use crate::fee_tracker::port::l1::{testing, BlockFees}; + + use super::*; + + #[test] + fn can_create_valid_sequential_fees() { + // Given + let block_fees = vec![ + BlockFees { + height: 1, + fees: Fees { + base_fee_per_gas: 100, + reward: 50, + base_fee_per_blob_gas: 10, + }, + }, + BlockFees { + height: 2, + fees: Fees { + base_fee_per_gas: 110, + reward: 55, + base_fee_per_blob_gas: 15, + }, + }, + ]; + + // When + let result = SequentialBlockFees::try_from(block_fees.clone()); + + // Then + assert!( + result.is_ok(), + "Expected SequentialBlockFees creation to succeed" + ); + let sequential_fees = result.unwrap(); + assert_eq!(sequential_fees.len(), block_fees.len()); + } + + #[test] + fn sequential_fees_cannot_be_empty() { + // Given + let block_fees: Vec = vec![]; + + // When + let result = SequentialBlockFees::try_from(block_fees); + + // Then + assert!( + result.is_err(), + "Expected SequentialBlockFees creation to fail for empty input" + ); + assert_eq!( + result.unwrap_err().to_string(), + "InvalidSequence(\"Input cannot be empty\")" + ); + } + + #[test] + fn fees_must_be_sequential() { + // Given + let block_fees = vec![ + BlockFees { + height: 1, + fees: Fees { + base_fee_per_gas: 100, + reward: 50, + base_fee_per_blob_gas: 10, + }, + }, + BlockFees { + height: 3, // Non-sequential height + fees: Fees { + base_fee_per_gas: 110, + reward: 55, + base_fee_per_blob_gas: 15, + }, + }, + ]; + + // When + let result = SequentialBlockFees::try_from(block_fees); + + // Then + assert!( + result.is_err(), + "Expected SequentialBlockFees creation to fail for non-sequential heights" + ); + assert_eq!( + result.unwrap_err().to_string(), + "InvalidSequence(\"blocks are not sequential by height: [1, 3]\")" + ); + } + + // TODO: segfault add more tests so that the in-order iteration invariant is properly tested + #[test] + fn produced_iterator_gives_correct_values() { + // Given + // notice the heights are out of order so that we validate that the returned sequence is in + // order + let block_fees = vec![ + BlockFees { + height: 2, + fees: Fees { + base_fee_per_gas: 110, + reward: 55, + base_fee_per_blob_gas: 15, + }, + }, + BlockFees { + height: 1, + fees: Fees { + base_fee_per_gas: 100, + reward: 50, + base_fee_per_blob_gas: 10, + }, + }, + ]; + let sequential_fees = SequentialBlockFees::try_from(block_fees.clone()).unwrap(); + + // When + let iterated_fees: Vec = sequential_fees.into_iter().collect(); + + // Then + let expectation = block_fees + .into_iter() + .sorted_by_key(|b| b.height) + .collect_vec(); + assert_eq!( + iterated_fees, expectation, + "Expected iterator to yield the same block fees" + ); + } + + #[tokio::test] + async fn calculates_sma_correctly_for_last_1_block() { + // given + let fees_provider = testing::PreconfiguredFeeApi::new(testing::incrementing_fees(5)); + let fee_analytics = FeeAnalytics::new(fees_provider); + + // when + let sma = fee_analytics.calculate_sma(4..=4).await.unwrap(); + + // then + assert_eq!(sma.base_fee_per_gas, 5); + assert_eq!(sma.reward, 5); + assert_eq!(sma.base_fee_per_blob_gas, 5); + } + + #[tokio::test] + async fn calculates_sma_correctly_for_last_5_blocks() { + // given + let fees_provider = testing::PreconfiguredFeeApi::new(testing::incrementing_fees(5)); + let fee_analytics = FeeAnalytics::new(fees_provider); + + // when + let sma = fee_analytics.calculate_sma(0..=4).await.unwrap(); + + // then + let mean = (5 + 4 + 3 + 2 + 1) / 5; + assert_eq!(sma.base_fee_per_gas, mean); + assert_eq!(sma.reward, mean); + assert_eq!(sma.base_fee_per_blob_gas, mean); + } + + #[tokio::test] + async fn errors_out_if_returned_fees_are_not_complete() { + // given + let mut fees = testing::incrementing_fees(5); + fees.remove(&4); + let fees_provider = testing::PreconfiguredFeeApi::new(fees); + let fee_analytics = FeeAnalytics::new(fees_provider); + + // when + let err = fee_analytics + .calculate_sma(0..=4) + .await + .expect_err("should have failed because returned fees are not complete"); + + // then + assert_eq!( + err.to_string(), + "fees received from the adapter(0..=3) don't cover the requested range (0..=4)" + ); + } + + #[tokio::test] + async fn latest_fees_on_fee_analytics() { + // given + let fees_map = testing::incrementing_fees(5); + let fees_provider = testing::PreconfiguredFeeApi::new(fees_map.clone()); + let fee_analytics = FeeAnalytics::new(fees_provider); + let height = 4; + + // when + let fee = fee_analytics.latest_fees().await.unwrap(); + + // then + let expected_fee = BlockFees { + height, + fees: Fees { + base_fee_per_gas: 5, + reward: 5, + base_fee_per_blob_gas: 5, + }, + }; + assert_eq!( + fee, expected_fee, + "Fee at height {height} should be {expected_fee:?}" + ); + } + + // fn calculate_tx_fee(fees: &Fees) -> u128 { + // 21_000 * fees.base_fee_per_gas + fees.reward + 6 * fees.base_fee_per_blob_gas * 131_072 + // } + // + // fn save_tx_fees(tx_fees: &[(u64, u128)], path: &str) { + // let mut csv_writer = + // csv::Writer::from_path(PathBuf::from("/home/segfault_magnet/grafovi/").join(path)) + // .unwrap(); + // csv_writer + // .write_record(["height", "tx_fee"].iter()) + // .unwrap(); + // for (height, fee) in tx_fees { + // csv_writer + // .write_record([height.to_string(), fee.to_string()]) + // .unwrap(); + // } + // csv_writer.flush().unwrap(); + // } + + // #[tokio::test] + // async fn something() { + // let client = make_pub_eth_client().await; + // use services::fee_analytics::port::l1::FeesProvider; + // + // let current_block_height = 21408300; + // let starting_block_height = current_block_height - 48 * 3600 / 12; + // let data = client + // .fees(starting_block_height..=current_block_height) + // .await + // .into_iter() + // .collect::>(); + // + // let fee_lookup = data + // .iter() + // .map(|b| (b.height, b.fees)) + // .collect::>(); + // + // let short_sma = 25u64; + // let long_sma = 900; + // + // let current_tx_fees = data + // .iter() + // .map(|b| (b.height, calculate_tx_fee(&b.fees))) + // .collect::>(); + // + // save_tx_fees(¤t_tx_fees, "current_fees.csv"); + // + // let local_client = TestFeesProvider::new(data.clone().into_iter().map(|e| (e.height, e.fees))); + // let fee_analytics = FeeAnalytics::new(local_client.clone()); + // + // let mut short_sma_tx_fees = vec![]; + // for height in (starting_block_height..=current_block_height).skip(short_sma as usize) { + // let fees = fee_analytics + // .calculate_sma(height - short_sma..=height) + // .await; + // + // let tx_fee = calculate_tx_fee(&fees); + // + // short_sma_tx_fees.push((height, tx_fee)); + // } + // save_tx_fees(&short_sma_tx_fees, "short_sma_fees.csv"); + // + // let decider = SendOrWaitDecider::new( + // FeeAnalytics::new(local_client.clone()), + // services::state_committer::fee_optimization::Config { + // sma_periods: services::state_committer::fee_optimization::SmaBlockNumPeriods { + // short: short_sma, + // long: long_sma, + // }, + // fee_thresholds: Feethresholds { + // max_l2_blocks_behind: 43200 * 3, + // start_discount_percentage: 0.2, + // end_premium_percentage: 0.2, + // always_acceptable_fee: 1000000000000000u128, + // }, + // }, + // ); + // + // let mut decisions = vec![]; + // let mut long_sma_tx_fees = vec![]; + // + // for height in (starting_block_height..=current_block_height).skip(long_sma as usize) { + // let fees = fee_analytics + // .calculate_sma(height - long_sma..=height) + // .await; + // let tx_fee = calculate_tx_fee(&fees); + // long_sma_tx_fees.push((height, tx_fee)); + // + // if decider + // .should_send_blob_tx( + // 6, + // Context { + // at_l1_height: height, + // num_l2_blocks_behind: (height - starting_block_height) * 12, + // }, + // ) + // .await + // { + // let current_fees = fee_lookup.get(&height).unwrap(); + // let current_tx_fee = calculate_tx_fee(current_fees); + // decisions.push((height, current_tx_fee)); + // } + // } + // + // save_tx_fees(&long_sma_tx_fees, "long_sma_fees.csv"); + // save_tx_fees(&decisions, "decisions.csv"); + // } +} diff --git a/packages/services/src/fee_tracker/port.rs b/packages/services/src/fee_tracker/port.rs new file mode 100644 index 00000000..e7ba4f55 --- /dev/null +++ b/packages/services/src/fee_tracker/port.rs @@ -0,0 +1,463 @@ +pub mod l1 { + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] + pub struct Fees { + pub base_fee_per_gas: u128, + pub reward: u128, + pub base_fee_per_blob_gas: u128, + } + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub struct BlockFees { + pub height: u64, + pub fees: Fees, + } + use std::ops::RangeInclusive; + + use itertools::Itertools; + + #[derive(Debug, PartialEq, Eq)] + pub struct SequentialBlockFees { + fees: Vec, + } + + #[derive(Debug)] + pub struct InvalidSequence(String); + + impl std::error::Error for InvalidSequence {} + + impl std::fmt::Display for InvalidSequence { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } + } + + impl IntoIterator for SequentialBlockFees { + type Item = BlockFees; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.fees.into_iter() + } + } + + // Cannot be empty + #[allow(clippy::len_without_is_empty)] + impl SequentialBlockFees { + pub fn len(&self) -> usize { + self.fees.len() + } + + pub fn height_range(&self) -> RangeInclusive { + let start = self.fees.first().expect("not empty").height; + let end = self.fees.last().expect("not empty").height; + start..=end + } + } + impl TryFrom> for SequentialBlockFees { + type Error = InvalidSequence; + fn try_from(mut fees: Vec) -> Result { + if fees.is_empty() { + return Err(InvalidSequence("Input cannot be empty".to_string())); + } + + fees.sort_by_key(|f| f.height); + + let is_sequential = fees + .iter() + .tuple_windows() + .all(|(l, r)| l.height + 1 == r.height); + + let heights = fees.iter().map(|f| f.height).collect::>(); + if !is_sequential { + return Err(InvalidSequence(format!( + "blocks are not sequential by height: {heights:?}" + ))); + } + + Ok(Self { fees }) + } + } + + #[allow(async_fn_in_trait)] + #[trait_variant::make(Send)] + #[cfg_attr(feature = "test-helpers", mockall::automock)] + pub trait Api { + async fn fees( + &self, + height_range: RangeInclusive, + ) -> crate::Result; + async fn current_height(&self) -> crate::Result; + } + + #[cfg(feature = "test-helpers")] + pub mod testing { + + use std::{collections::BTreeMap, ops::RangeInclusive}; + + use itertools::Itertools; + + use super::{Api, BlockFees, Fees, SequentialBlockFees}; + + #[derive(Debug, Clone, Copy)] + pub struct ConstantFeeApi { + fees: Fees, + } + + impl ConstantFeeApi { + pub fn new(fees: Fees) -> Self { + Self { fees } + } + } + + impl Api for ConstantFeeApi { + async fn fees( + &self, + height_range: RangeInclusive, + ) -> crate::Result { + let fees = height_range + .into_iter() + .map(|height| BlockFees { + height, + fees: self.fees, + }) + .collect_vec(); + + Ok(fees.try_into().unwrap()) + } + + async fn current_height(&self) -> crate::Result { + Ok(0) + } + } + + #[derive(Debug, Clone)] + pub struct PreconfiguredFeeApi { + fees: BTreeMap, + } + + impl Api for PreconfiguredFeeApi { + async fn current_height(&self) -> crate::Result { + Ok(*self + .fees + .keys() + .last() + .expect("no fees registered with PreconfiguredFeesProvider")) + } + + async fn fees( + &self, + height_range: RangeInclusive, + ) -> crate::Result { + let fees = self + .fees + .iter() + .skip_while(|(height, _)| !height_range.contains(height)) + .take_while(|(height, _)| height_range.contains(height)) + .map(|(height, fees)| BlockFees { + height: *height, + fees: *fees, + }) + .collect_vec(); + + Ok(fees.try_into().expect("block fees not sequential")) + } + } + + impl PreconfiguredFeeApi { + pub fn new(blocks: impl IntoIterator) -> Self { + Self { + fees: blocks.into_iter().collect(), + } + } + } + + pub fn incrementing_fees(num_blocks: u64) -> BTreeMap { + (0..num_blocks) + .map(|i| { + ( + i, + Fees { + base_fee_per_gas: i as u128 + 1, + reward: i as u128 + 1, + base_fee_per_blob_gas: i as u128 + 1, + }, + ) + }) + .collect() + } + } +} + +pub mod cache { + use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc}; + + use tokio::sync::RwLock; + + use crate::Error; + + use super::l1::{Api, BlockFees, Fees, SequentialBlockFees}; + + #[derive(Debug, Clone)] + pub struct CachingApi

{ + fees_provider: P, + cache: Arc>>, + cache_limit: usize, + } + + impl

CachingApi

{ + pub fn new(fees_provider: P, cache_limit: usize) -> Self { + Self { + fees_provider, + cache: Arc::new(RwLock::new(BTreeMap::new())), + cache_limit, + } + } + } + + impl Api for CachingApi

{ + async fn fees( + &self, + height_range: RangeInclusive, + ) -> crate::Result { + self.get_fees(height_range).await + } + + async fn current_height(&self) -> crate::Result { + self.fees_provider.current_height().await + } + } + + impl CachingApi

{ + pub async fn get_fees( + &self, + height_range: RangeInclusive, + ) -> crate::Result { + let mut missing_heights = vec![]; + + // Mind the scope to release the read lock + { + let cache = self.cache.read().await; + for height in height_range.clone() { + if !cache.contains_key(&height) { + missing_heights.push(height); + } + } + } + + if !missing_heights.is_empty() { + let fetched_fees = self + .fees_provider + .fees( + *missing_heights.first().expect("not empty") + ..=*missing_heights.last().expect("not empty"), + ) + .await?; + + let mut cache = self.cache.write().await; + for block_fee in fetched_fees { + cache.insert(block_fee.height, block_fee.fees); + } + } + + let fees: Vec<_> = { + let cache = self.cache.read().await; + height_range + .filter_map(|h| { + cache.get(&h).map(|f| BlockFees { + height: h, + fees: *f, + }) + }) + .collect() + }; + + self.shrink_cache().await; + + SequentialBlockFees::try_from(fees).map_err(|e| Error::Other(e.to_string())) + } + + async fn shrink_cache(&self) { + let mut cache = self.cache.write().await; + while cache.len() > self.cache_limit { + cache.pop_first(); + } + } + } + + #[cfg(test)] + mod tests { + use std::ops::RangeInclusive; + + use mockall::{predicate::eq, Sequence}; + + use crate::fee_tracker::port::{ + cache::CachingApi, + l1::{BlockFees, Fees, MockApi, SequentialBlockFees}, + }; + + #[tokio::test] + async fn caching_provider_avoids_duplicate_requests() { + // given + let mut mock_provider = MockApi::new(); + + mock_provider + .expect_fees() + .with(eq(0..=4)) + .once() + .return_once(|range| { + Box::pin(async move { + Ok(SequentialBlockFees::try_from( + range + .map(|h| BlockFees { + height: h, + fees: Fees { + base_fee_per_gas: h as u128, + reward: h as u128, + base_fee_per_blob_gas: h as u128, + }, + }) + .collect::>(), + ) + .unwrap()) + }) + }); + + let provider = CachingApi::new(mock_provider, 5); + let _ = provider.get_fees(0..=4).await.unwrap(); + + // when + let _ = provider.get_fees(0..=4).await.unwrap(); + + // then + // mock validates no extra calls made + } + + #[tokio::test] + async fn caching_provider_fetches_only_missing_blocks() { + // given + let mut mock_provider = MockApi::new(); + + let mut sequence = Sequence::new(); + mock_provider + .expect_fees() + .with(eq(0..=2)) + .once() + .return_once(|range| { + Box::pin(async move { + Ok(SequentialBlockFees::try_from( + range + .map(|h| BlockFees { + height: h, + fees: Fees { + base_fee_per_gas: h as u128, + reward: h as u128, + base_fee_per_blob_gas: h as u128, + }, + }) + .collect::>(), + ) + .unwrap()) + }) + }) + .in_sequence(&mut sequence); + + mock_provider + .expect_fees() + .with(eq(3..=5)) + .once() + .return_once(|range| { + Box::pin(async move { + Ok(SequentialBlockFees::try_from( + range + .map(|h| BlockFees { + height: h, + fees: Fees { + base_fee_per_gas: h as u128, + reward: h as u128, + base_fee_per_blob_gas: h as u128, + }, + }) + .collect::>(), + ) + .unwrap()) + }) + }) + .in_sequence(&mut sequence); + + let provider = CachingApi::new(mock_provider, 5); + let _ = provider.get_fees(0..=2).await.unwrap(); + + // when + let _ = provider.get_fees(2..=5).await.unwrap(); + + // then + // not called for the overlapping area + } + + #[tokio::test] + async fn caching_provider_evicts_oldest_blocks() { + // given + let mut mock_provider = MockApi::new(); + + mock_provider + .expect_fees() + .with(eq(0..=4)) + .times(2) + .returning(|range| Box::pin(async { Ok(generate_sequential_fees(range)) })); + + mock_provider + .expect_fees() + .with(eq(5..=9)) + .times(1) + .returning(|range| Box::pin(async { Ok(generate_sequential_fees(range)) })); + + let provider = CachingApi::new(mock_provider, 5); + let _ = provider.get_fees(0..=4).await.unwrap(); + let _ = provider.get_fees(5..=9).await.unwrap(); + + // when + let _ = provider.get_fees(0..=4).await.unwrap(); + + // then + // will refetch 0..=4 due to eviction + } + + #[tokio::test] + async fn caching_provider_handles_request_larger_than_cache() { + use mockall::predicate::*; + + // given + let mut mock_provider = MockApi::new(); + + let cache_limit = 5; + + mock_provider + .expect_fees() + .with(eq(0..=9)) + .times(1) + .returning(|range| Box::pin(async move { Ok(generate_sequential_fees(range)) })); + + let provider = CachingApi::new(mock_provider, cache_limit); + + // when + let result = provider.get_fees(0..=9).await.unwrap(); + + assert_eq!(result, generate_sequential_fees(0..=9)); + } + + fn generate_sequential_fees(height_range: RangeInclusive) -> SequentialBlockFees { + SequentialBlockFees::try_from( + height_range + .map(|h| BlockFees { + height: h, + fees: Fees { + base_fee_per_gas: h as u128, + reward: h as u128, + base_fee_per_blob_gas: h as u128, + }, + }) + .collect::>(), + ) + .unwrap() + } + } +} diff --git a/packages/services/src/fee_tracker/service.rs b/packages/services/src/fee_tracker/service.rs new file mode 100644 index 00000000..680afa13 --- /dev/null +++ b/packages/services/src/fee_tracker/service.rs @@ -0,0 +1,457 @@ +use std::{ + cmp::min, + num::{NonZeroU32, NonZeroU64}, + ops::RangeInclusive, +}; + +use metrics::{ + prometheus::{core::Collector, IntGauge, Opts}, + RegistersMetrics, +}; +use tracing::info; + +use crate::{state_committer::service::SendOrWaitDecider, Error, Result, Runner}; + +use super::{ + fee_analytics::FeeAnalytics, + port::l1::{Api, BlockFees, Fees}, +}; + +#[derive(Debug, Clone, Copy)] +pub struct Config { + pub sma_periods: SmaPeriods, + pub fee_thresholds: FeeThresholds, +} + +#[cfg(feature = "test-helpers")] +impl Default for Config { + fn default() -> Self { + Config { + sma_periods: SmaPeriods { + short: 1.try_into().expect("not zero"), + long: 2.try_into().expect("not zero"), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + ..FeeThresholds::default() + }, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SmaPeriods { + pub short: NonZeroU64, + pub long: NonZeroU64, +} + +#[derive(Debug, Clone, Copy)] +pub struct FeeThresholds { + pub max_l2_blocks_behind: NonZeroU32, + pub start_discount_percentage: Percentage, + pub end_premium_percentage: Percentage, + pub always_acceptable_fee: u128, +} + +#[cfg(feature = "test-helpers")] +impl Default for FeeThresholds { + fn default() -> Self { + Self { + max_l2_blocks_behind: NonZeroU32::MAX, + start_discount_percentage: Percentage::ZERO, + end_premium_percentage: Percentage::ZERO, + always_acceptable_fee: u128::MAX, + } + } +} + +impl SendOrWaitDecider for FeeTracker

{ + async fn should_send_blob_tx( + &self, + num_blobs: u32, + num_l2_blocks_behind: u32, + at_l1_height: u64, + ) -> Result { + if self.too_far_behind(num_l2_blocks_behind) { + info!("Sending because we've fallen behind by {} which is more than the configured maximum of {}", num_l2_blocks_behind, self.config.fee_thresholds.max_l2_blocks_behind); + return Ok(true); + } + + // opted out of validating that num_blobs <= 6, it's not this fn's problem if the caller + // wants to send more than 6 blobs + let last_n_blocks = |n| Self::last_n_blocks(at_l1_height, n); + + let short_term_sma = self + .fee_analytics + .calculate_sma(last_n_blocks(self.config.sma_periods.short)) + .await?; + + let long_term_sma = self + .fee_analytics + .calculate_sma(last_n_blocks(self.config.sma_periods.long)) + .await?; + + let short_term_tx_fee = Self::calculate_blob_tx_fee(num_blobs, &short_term_sma); + + if self.fee_always_acceptable(short_term_tx_fee) { + info!("Sending because: short term price {} is deemed always acceptable since it is <= {}", short_term_tx_fee, self.config.fee_thresholds.always_acceptable_fee); + return Ok(true); + } + + let long_term_tx_fee = Self::calculate_blob_tx_fee(num_blobs, &long_term_sma); + let max_upper_tx_fee = Self::calculate_max_upper_fee( + &self.config.fee_thresholds, + long_term_tx_fee, + num_l2_blocks_behind, + ); + + info!("short_term_tx_fee: {short_term_tx_fee}, long_term_tx_fee: {long_term_tx_fee}, max_upper_tx_fee: {max_upper_tx_fee}"); + + let should_send = short_term_tx_fee < max_upper_tx_fee; + + if should_send { + info!( + "Sending because short term price {} is lower than the max upper fee {}", + short_term_tx_fee, max_upper_tx_fee + ); + } else { + info!( + "Not sending because short term price {} is higher than the max upper fee {}", + short_term_tx_fee, max_upper_tx_fee + ); + } + + Ok(should_send) + } +} + +#[derive(Default, Copy, Clone, Debug, PartialEq)] +pub struct Percentage(f64); + +impl TryFrom for Percentage { + type Error = Error; + + fn try_from(value: f64) -> std::result::Result { + if value < 0. { + return Err(Error::Other(format!("Invalid percentage value {value}"))); + } + + Ok(Self(value)) + } +} + +impl From for f64 { + fn from(value: Percentage) -> Self { + value.0 + } +} + +impl Percentage { + pub const ZERO: Self = Percentage(0.); + pub const PPM: u128 = 1_000_000; + + pub fn ppm(&self) -> u128 { + (self.0 * 1_000_000.) as u128 + } +} + +#[derive(Debug, Clone)] +struct Metrics { + current_blob_tx_fee: IntGauge, + short_term_blob_tx_fee: IntGauge, + long_term_blob_tx_fee: IntGauge, +} + +impl Default for Metrics { + fn default() -> Self { + let current_blob_tx_fee = IntGauge::with_opts(Opts::new( + "current_blob_tx_fee", + "The current fee for a transaction with 6 blobs", + )) + .expect("metric config to be correct"); + + let short_term_blob_tx_fee = IntGauge::with_opts(Opts::new( + "short_term_blob_tx_fee", + "The short term fee for a transaction with 6 blobs", + )) + .expect("metric config to be correct"); + + let long_term_blob_tx_fee = IntGauge::with_opts(Opts::new( + "long_term_blob_tx_fee", + "The long term fee for a transaction with 6 blobs", + )) + .expect("metric config to be correct"); + + Self { + current_blob_tx_fee, + short_term_blob_tx_fee, + long_term_blob_tx_fee, + } + } +} + +impl

RegistersMetrics for FeeTracker

{ + fn metrics(&self) -> Vec> { + vec![ + Box::new(self.metrics.current_blob_tx_fee.clone()), + Box::new(self.metrics.short_term_blob_tx_fee.clone()), + Box::new(self.metrics.long_term_blob_tx_fee.clone()), + ] + } +} + +#[derive(Clone)] +pub struct FeeTracker

{ + fee_analytics: FeeAnalytics

, + config: Config, + metrics: Metrics, +} + +impl FeeTracker

{ + fn too_far_behind(&self, num_l2_blocks_behind: u32) -> bool { + num_l2_blocks_behind >= self.config.fee_thresholds.max_l2_blocks_behind.get() + } + + fn fee_always_acceptable(&self, short_term_tx_fee: u128) -> bool { + short_term_tx_fee <= self.config.fee_thresholds.always_acceptable_fee + } + + fn calculate_max_upper_fee( + fee_thresholds: &FeeThresholds, + fee: u128, + num_l2_blocks_behind: u32, + ) -> u128 { + let max_blocks_behind = u128::from(fee_thresholds.max_l2_blocks_behind.get()); + let blocks_behind = u128::from(num_l2_blocks_behind); + + debug_assert!( + blocks_behind <= max_blocks_behind, + "blocks_behind ({}) should not exceed max_blocks_behind ({}), it should have been handled earlier", + blocks_behind, + max_blocks_behind + ); + + let start_discount_ppm = min( + fee_thresholds.start_discount_percentage.ppm(), + Percentage::PPM, + ); + let end_premium_ppm = fee_thresholds.end_premium_percentage.ppm(); + + // 1. The highest we're initially willing to go: eg. 100% - 20% = 80% + let base_multiplier = Percentage::PPM.saturating_sub(start_discount_ppm); + + // 2. How late are we: eg. late enough to add 25% to our base multiplier + let premium_increment = Self::calculate_premium_increment( + start_discount_ppm, + end_premium_ppm, + blocks_behind, + max_blocks_behind, + ); + + // 3. Total multiplier consist of the base and the premium increment: eg. 80% + 25% = 105% + let multiplier_ppm = min( + base_multiplier.saturating_add(premium_increment), + Percentage::PPM + end_premium_ppm, + ); + + info!("start_discount_ppm: {start_discount_ppm}, end_premium_ppm: {end_premium_ppm}, base_multiplier: {base_multiplier}, premium_increment: {premium_increment}, multiplier_ppm: {multiplier_ppm}"); + + // 3. Final fee: eg. 105% of the base fee + fee.saturating_mul(multiplier_ppm) + .saturating_div(Percentage::PPM) + } + + fn calculate_premium_increment( + start_discount_ppm: u128, + end_premium_ppm: u128, + blocks_behind: u128, + max_blocks_behind: u128, + ) -> u128 { + let total_ppm = start_discount_ppm.saturating_add(end_premium_ppm); + + let proportion = if max_blocks_behind == 0 { + 0 + } else { + blocks_behind + .saturating_mul(Percentage::PPM) + .saturating_div(max_blocks_behind) + }; + + total_ppm + .saturating_mul(proportion) + .saturating_div(Percentage::PPM) + } + + // TODO: Segfault maybe dont leak so much eth abstractions + fn calculate_blob_tx_fee(num_blobs: u32, fees: &Fees) -> u128 { + const DATA_GAS_PER_BLOB: u128 = 131_072u128; + const INTRINSIC_GAS: u128 = 21_000u128; + + let base_fee = INTRINSIC_GAS.saturating_mul(fees.base_fee_per_gas); + let blob_fee = fees + .base_fee_per_blob_gas + .saturating_mul(num_blobs as u128) + .saturating_mul(DATA_GAS_PER_BLOB); + let reward_fee = fees.reward.saturating_mul(INTRINSIC_GAS); + + base_fee.saturating_add(blob_fee).saturating_add(reward_fee) + } + + fn last_n_blocks(current_block: u64, n: NonZeroU64) -> RangeInclusive { + current_block.saturating_sub(n.get().saturating_sub(1))..=current_block + } + + pub async fn update_metrics(&self) -> Result<()> { + let latest_fees = self.fee_analytics.latest_fees().await?; + let short_term_sma = self + .fee_analytics + .calculate_sma(Self::last_n_blocks( + latest_fees.height, + self.config.sma_periods.short, + )) + .await?; + + let long_term_sma = self + .fee_analytics + .calculate_sma(Self::last_n_blocks( + latest_fees.height, + self.config.sma_periods.long, + )) + .await?; + + let calc_fee = + |fees: &Fees| i64::try_from(Self::calculate_blob_tx_fee(6, fees)).unwrap_or(i64::MAX); + + self.metrics + .current_blob_tx_fee + .set(calc_fee(&latest_fees.fees)); + self.metrics + .short_term_blob_tx_fee + .set(calc_fee(&short_term_sma)); + self.metrics + .long_term_blob_tx_fee + .set(calc_fee(&long_term_sma)); + + Ok(()) + } +} + +impl

FeeTracker

{ + pub fn new(fee_provider: P, config: Config) -> Self { + Self { + fee_analytics: FeeAnalytics::new(fee_provider), + config, + metrics: Metrics::default(), + } + } +} + +impl

Runner for FeeTracker

+where + P: crate::fee_tracker::port::l1::Api + Send + Sync, +{ + async fn run(&mut self) -> Result<()> { + self.update_metrics().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::fee_tracker::port::l1::testing::ConstantFeeApi; + + use super::*; + use test_case::test_case; + + #[test_case( + // Test Case 1: No blocks behind, no discount or premium + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + 1000, + 0, + 1000; + "No blocks behind, multiplier should be 100%" + )] + #[test_case( + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.20.try_into().unwrap(), + end_premium_percentage: 0.25.try_into().unwrap(), + always_acceptable_fee: 0, + }, + 2000, + 50, + 2050; + "Half blocks behind with discount and premium" + )] + #[test_case( + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.25.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + 800, + 50, + 700; + "Start discount only, no premium" + )] + #[test_case( + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + end_premium_percentage: 0.30.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + 1000, + 50, + 1150; + "End premium only, no discount" + )] + #[test_case( + // Test Case 8: High fee with premium + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.10.try_into().unwrap(), + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 0, + }, + 10_000, + 99, + 11970; + "High fee with premium" + )] + #[test_case( + FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 1.50.try_into().unwrap(), // 150% + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 0, + }, + 1000, + 1, + 12; + "Discount exceeds 100%, should be capped to 100%" +)] + fn test_calculate_max_upper_fee( + fee_thresholds: FeeThresholds, + fee: u128, + num_l2_blocks_behind: u32, + expected_max_upper_fee: u128, + ) { + let max_upper_fee = FeeTracker::::calculate_max_upper_fee( + &fee_thresholds, + fee, + num_l2_blocks_behind, + ); + + assert_eq!( + max_upper_fee, expected_max_upper_fee, + "Expected max_upper_fee to be {}, but got {}", + expected_max_upper_fee, max_upper_fee + ); + } +} diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs index 0e1391a1..48123b0a 100644 --- a/packages/services/src/lib.rs +++ b/packages/services/src/lib.rs @@ -2,6 +2,7 @@ pub mod block_bundler; pub mod block_committer; pub mod block_importer; pub mod cost_reporter; +pub mod fee_tracker; pub mod health_reporter; pub mod state_committer; pub mod state_listener; @@ -22,7 +23,7 @@ pub use block_bundler::{ pub use state_committer::service::{Config as StateCommitterConfig, StateCommitter}; use types::InvalidL1Height; -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, PartialEq)] pub enum Error { #[error("{0}")] Other(String), diff --git a/packages/services/src/state_committer.rs b/packages/services/src/state_committer.rs index ca2dcf3f..de728ad8 100644 --- a/packages/services/src/state_committer.rs +++ b/packages/services/src/state_committer.rs @@ -6,6 +6,10 @@ pub mod service { Result, Runner, }; use itertools::Itertools; + use metrics::{ + prometheus::{core::Collector, IntGauge, Opts}, + RegistersMetrics, + }; use tracing::info; // src/config.rs @@ -16,7 +20,6 @@ pub mod service { pub fragment_accumulation_timeout: Duration, pub fragments_to_accumulate: NonZeroUsize, pub gas_bump_timeout: Duration, - pub tx_max_fee: u128, } #[cfg(feature = "test-helpers")] @@ -27,22 +30,57 @@ pub mod service { fragment_accumulation_timeout: Duration::from_secs(0), fragments_to_accumulate: 1.try_into().unwrap(), gas_bump_timeout: Duration::from_secs(300), - tx_max_fee: 1_000_000_000, } } } + #[allow(async_fn_in_trait)] + #[trait_variant::make(Send)] + pub trait SendOrWaitDecider { + async fn should_send_blob_tx( + &self, + num_blobs: u32, + num_l2_blocks_behind: u32, + at_l1_height: u64, + ) -> Result; + } + + struct Metrics { + num_l2_blocks_behind: IntGauge, + } + + impl Default for Metrics { + fn default() -> Self { + let num_l2_blocks_behind = IntGauge::with_opts(Opts::new( + "num_l2_blocks_behind", + "How many L2 blocks have been produced since the starting height of the oldest bundle we're committing", + )).expect("metric config to be correct"); + + Self { + num_l2_blocks_behind, + } + } + } + + impl RegistersMetrics for StateCommitter { + fn metrics(&self) -> Vec> { + vec![Box::new(self.metrics.num_l2_blocks_behind.clone())] + } + } + /// The `StateCommitter` is responsible for committing state fragments to L1. - pub struct StateCommitter { + pub struct StateCommitter { l1_adapter: L1, fuel_api: FuelApi, storage: Db, config: Config, clock: Clock, startup_time: DateTime, + decider: D, + metrics: Metrics, } - impl StateCommitter + impl StateCommitter where Clock: crate::state_committer::port::Clock, { @@ -53,8 +91,10 @@ pub mod service { storage: Db, config: Config, clock: Clock, + decider: Decider, ) -> Self { let startup_time = clock.now(); + Self { l1_adapter, fuel_api, @@ -62,16 +102,19 @@ pub mod service { config, clock, startup_time, + decider, + metrics: Default::default(), } } } - impl StateCommitter + impl StateCommitter where - L1: crate::state_committer::port::l1::Api, + L1: crate::state_committer::port::l1::Api + Send + Sync, FuelApi: crate::state_committer::port::fuel::Api, Db: crate::state_committer::port::Storage, Clock: crate::state_committer::port::Clock, + Decider: SendOrWaitDecider, { async fn get_reference_time(&self) -> Result> { Ok(self @@ -90,11 +133,43 @@ pub mod service { Ok(std_elapsed >= self.config.fragment_accumulation_timeout) } + async fn should_send_tx(&self, fragments: &NonEmpty) -> Result { + let l1_height = self.l1_adapter.current_height().await?; + let l2_height = self.fuel_api.latest_height().await?; + + let num_l2_blocks_behind = self.num_l2_blocks_behind(fragments, l2_height); + self.update_l2_blocks_behind_metric(num_l2_blocks_behind); + + self.decider + .should_send_blob_tx( + u32::try_from(fragments.len()).expect("not to send more than u32::MAX blobs"), + num_l2_blocks_behind, + l1_height, + ) + .await + } + + fn num_l2_blocks_behind( + &self, + fragments: &NonEmpty, + l2_height: u32, + ) -> u32 { + let oldest_l2_block_in_fragments = fragments + .minimum_by_key(|b| b.oldest_block_in_bundle) + .oldest_block_in_bundle; + + l2_height.saturating_sub(oldest_l2_block_in_fragments) + } + async fn submit_fragments( &self, fragments: NonEmpty, previous_tx: Option, ) -> Result<()> { + if !self.should_send_tx(&fragments).await? { + info!("decided against sending fragments due to high fees"); + return Ok(()); + } info!("about to send at most {} fragments", fragments.len()); let data = fragments.clone().map(|f| f.fragment); @@ -154,7 +229,23 @@ pub mod service { .oldest_nonfinalized_fragments(starting_height, 6) .await?; - Ok(NonEmpty::collect(existing_fragments)) + let fragments = NonEmpty::collect(existing_fragments); + + if let Some(fragments) = fragments.as_ref() { + // Tracking the metric here as well to get updates more often -- because + // submit_fragments might not be called + self.update_l2_blocks_behind_metric( + self.num_l2_blocks_behind(fragments, latest_height), + ); + } + + Ok(fragments) + } + + fn update_l2_blocks_behind_metric(&self, l2_blocks_behind: u32) { + self.metrics + .num_l2_blocks_behind + .set(l2_blocks_behind as i64); } async fn should_submit_fragments(&self, fragment_count: NonZeroUsize) -> Result { @@ -186,6 +277,7 @@ pub mod service { self.submit_fragments(fragments, None).await?; } } + Ok(()) } @@ -232,12 +324,13 @@ pub mod service { } } - impl Runner for StateCommitter + impl Runner for StateCommitter where L1: crate::state_committer::port::l1::Api + Send + Sync, FuelApi: crate::state_committer::port::fuel::Api + Send + Sync, Db: crate::state_committer::port::Storage + Clone + Send + Sync, Clock: crate::state_committer::port::Clock + Send + Sync, + Decider: SendOrWaitDecider + Send + Sync, { async fn run(&mut self) -> Result<()> { if self.storage.has_nonfinalized_txs().await? { @@ -260,13 +353,13 @@ pub mod port { }; pub mod l1 { + use nonempty::NonEmpty; use crate::{ types::{BlockSubmissionTx, Fragment, FragmentsSubmitted, L1Tx}, Result, }; - #[allow(async_fn_in_trait)] #[trait_variant::make(Send)] #[cfg_attr(feature = "test-helpers", mockall::automock)] @@ -278,6 +371,7 @@ pub mod port { #[trait_variant::make(Send)] #[cfg_attr(feature = "test-helpers", mockall::automock)] pub trait Api { + async fn current_height(&self) -> Result; async fn submit_state_fragments( &self, fragments: NonEmpty, diff --git a/packages/services/src/types/storage.rs b/packages/services/src/types/storage.rs index 3f93e8ca..368c24f3 100644 --- a/packages/services/src/types/storage.rs +++ b/packages/services/src/types/storage.rs @@ -16,6 +16,7 @@ pub struct BundleFragment { pub id: NonNegative, pub idx: NonNegative, pub bundle_id: NonNegative, + pub oldest_block_in_bundle: u32, pub fragment: Fragment, } diff --git a/packages/services/tests/fee_tracker.rs b/packages/services/tests/fee_tracker.rs new file mode 100644 index 00000000..6936791b --- /dev/null +++ b/packages/services/tests/fee_tracker.rs @@ -0,0 +1,380 @@ +use services::fee_tracker::port::l1::testing::PreconfiguredFeeApi; +use services::fee_tracker::port::l1::Api; +use services::fee_tracker::service::FeeThresholds; +use services::fee_tracker::service::FeeTracker; +use services::fee_tracker::service::Percentage; +use services::fee_tracker::service::SmaPeriods; +use services::fee_tracker::{port::l1::Fees, service::Config}; +use services::state_committer::service::SendOrWaitDecider; +use test_case::test_case; + +fn generate_fees(config: Config, old_fees: Fees, new_fees: Fees) -> Vec<(u64, Fees)> { + let older_fees = std::iter::repeat_n( + old_fees, + (config.sma_periods.long.get() - config.sma_periods.short.get()) as usize, + ); + let newer_fees = std::iter::repeat_n(new_fees, config.sma_periods.short.get() as usize); + + older_fees + .chain(newer_fees) + .enumerate() + .map(|(i, f)| (i as u64, f)) + .collect() +} + +#[test_case( + Fees { base_fee_per_gas: 5000, reward: 5000, base_fee_per_blob_gas: 5000 }, + Fees { base_fee_per_gas: 3000, reward: 3000, base_fee_per_blob_gas: 3000 }, + 6, + Config { + sma_periods: services::fee_tracker::service::SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }, + 0, // not behind at all + true; + "Should send because all short-term fees are lower than long-term" + )] +#[test_case( + Fees { base_fee_per_gas: 3000, reward: 3000, base_fee_per_blob_gas: 3000 }, + Fees { base_fee_per_gas: 5000, reward: 5000, base_fee_per_blob_gas: 5000 }, + 6, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }, + 0, + false; + "Should not send because all short-term fees are higher than long-term" + )] +#[test_case( + Fees { base_fee_per_gas: 3000, reward: 3000, base_fee_per_blob_gas: 3000 }, + Fees { base_fee_per_gas: 5000, reward: 5000, base_fee_per_blob_gas: 5000 }, + 6, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + always_acceptable_fee: (21_000 * (5000 + 5000)) + (6 * 131_072 * 5000) + 1, + max_l2_blocks_behind: 100.try_into().unwrap(), + ..Default::default() + } + }, + 0, + true; + "Should send since short-term fee less than always_acceptable_fee" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 10000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 1500, reward: 10000, base_fee_per_blob_gas: 1000 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Should send because short-term base_fee_per_gas is lower" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 10000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 2500, reward: 10000, base_fee_per_blob_gas: 1000 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + false; + "Should not send because short-term base_fee_per_gas is higher" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 3000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 2000, reward: 3000, base_fee_per_blob_gas: 900 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Should send because short-term base_fee_per_blob_gas is lower" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 3000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 2000, reward: 3000, base_fee_per_blob_gas: 1100 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + false; + "Should not send because short-term base_fee_per_blob_gas is higher" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 10000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 2000, reward: 9000, base_fee_per_blob_gas: 1000 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Should send because short-term reward is lower" + )] +#[test_case( + Fees { base_fee_per_gas: 2000, reward: 10000, base_fee_per_blob_gas: 1000 }, + Fees { base_fee_per_gas: 2000, reward: 11000, base_fee_per_blob_gas: 1000 }, + 5, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + false; + "Should not send because short-term reward is higher" + )] +#[test_case( + // Multiple short-term fees are lower + Fees { base_fee_per_gas: 4000, reward: 8000, base_fee_per_blob_gas: 4000 }, + Fees { base_fee_per_gas: 3000, reward: 7000, base_fee_per_blob_gas: 3500 }, + 6, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Should send because multiple short-term fees are lower" + )] +#[test_case( + Fees { base_fee_per_gas: 5000, reward: 5000, base_fee_per_blob_gas: 5000 }, + Fees { base_fee_per_gas: 5000, reward: 5000, base_fee_per_blob_gas: 5000 }, + 6, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + false; + "Should not send because all fees are identical and no tolerance" + )] +#[test_case( + // Zero blobs scenario: blob fee differences don't matter + Fees { base_fee_per_gas: 3000, reward: 6000, base_fee_per_blob_gas: 5000 }, + Fees { base_fee_per_gas: 2500, reward: 5500, base_fee_per_blob_gas: 5000 }, + 0, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Zero blobs: short-term base_fee_per_gas and reward are lower, send" + )] +#[test_case( + // Zero blobs but short-term reward is higher + Fees { base_fee_per_gas: 3000, reward: 6000, base_fee_per_blob_gas: 5000 }, + Fees { base_fee_per_gas: 3000, reward: 7000, base_fee_per_blob_gas: 5000 }, + 0, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + false; + "Zero blobs: short-term reward is higher, don't send" + )] +#[test_case( + Fees { base_fee_per_gas: 3000, reward: 6000, base_fee_per_blob_gas: 5000 }, + Fees { base_fee_per_gas: 2000, reward: 6000, base_fee_per_blob_gas: 50_000_000 }, + 0, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + } + }, + 0, + true; + "Zero blobs: ignore blob fee, short-term base_fee_per_gas is lower, send" + )] +// Initially not send, but as num_l2_blocks_behind increases, acceptance grows. +#[test_case( + // Initially short-term fee too high compared to long-term (strict scenario), no send at t=0 + Fees { base_fee_per_gas: 6000, reward: 0, base_fee_per_blob_gas: 6000 }, + Fees { base_fee_per_gas: 7000, reward: 0, base_fee_per_blob_gas: 7000 }, + 1, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: Percentage::try_from(0.20).unwrap(), + end_premium_percentage: Percentage::try_from(0.20).unwrap(), + always_acceptable_fee: 0, + }, + }, + 0, + false; + "Early: short-term expensive, not send" + )] +#[test_case( + // At max_l2_blocks_behind, send regardless + Fees { base_fee_per_gas: 6000, reward: 0, base_fee_per_blob_gas: 6000 }, + Fees { base_fee_per_gas: 7000, reward: 0, base_fee_per_blob_gas: 7000 }, + 1, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6.try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.20.try_into().unwrap(), + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 0, + } + }, + 100, + true; + "Later: after max wait, send regardless" + )] +#[test_case( + Fees { base_fee_per_gas: 6000, reward: 0, base_fee_per_blob_gas: 6000 }, + Fees { base_fee_per_gas: 7000, reward: 0, base_fee_per_blob_gas: 7000 }, + 1, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.20.try_into().unwrap(), + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 0, + }, + }, + 80, + true; + "Mid-wait: increased tolerance allows acceptance" + )] +#[test_case( + // Short-term fee is huge, but always_acceptable_fee is large, so send immediately + Fees { base_fee_per_gas: 100_000, reward: 0, base_fee_per_blob_gas: 100_000 }, + Fees { base_fee_per_gas: 2_000_000, reward: 1_000_000, base_fee_per_blob_gas: 20_000_000 }, + 1, + Config { + sma_periods: SmaPeriods { short: 2.try_into().unwrap(), long: 6 .try_into().unwrap()}, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.20.try_into().unwrap(), + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 2_700_000_000_000 + }, + }, + 0, + true; + "Always acceptable fee triggers immediate send" + )] +#[tokio::test] +async fn parameterized_send_or_wait_tests( + old_fees: Fees, + new_fees: Fees, + num_blobs: u32, + config: Config, + num_l2_blocks_behind: u32, + expected_decision: bool, +) { + let fees = generate_fees(config, old_fees, new_fees); + let fees_provider = PreconfiguredFeeApi::new(fees); + let current_block_height = fees_provider.current_height().await.unwrap(); + + let sut = FeeTracker::new(fees_provider, config); + + let should_send = sut + .should_send_blob_tx(num_blobs, num_l2_blocks_behind, current_block_height) + .await + .unwrap(); + + assert_eq!( + should_send, expected_decision, + "For num_blobs={num_blobs}, num_l2_blocks_behind={num_l2_blocks_behind}, config={config:?}: Expected decision: {expected_decision}, got: {should_send}", + ); +} + +#[tokio::test] +async fn test_send_when_too_far_behind_and_fee_provider_fails() { + // given + let config = Config { + sma_periods: SmaPeriods { + short: 2.try_into().unwrap(), + long: 6.try_into().unwrap(), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 10.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }; + + // having no fees will make the validation in fee analytics fail + let fee_provider = PreconfiguredFeeApi::new(vec![]); + let sut = FeeTracker::new(fee_provider, config); + + // when + let should_send = sut + .should_send_blob_tx(1, 20, 100) + .await + .expect("Should send despite fee provider failure"); + + // then + assert!( + should_send, + "Should send because too far behind, regardless of fee provider status" + ); +} diff --git a/packages/services/tests/state_committer.rs b/packages/services/tests/state_committer.rs index 831dbeb5..345fe363 100644 --- a/packages/services/tests/state_committer.rs +++ b/packages/services/tests/state_committer.rs @@ -1,8 +1,13 @@ use services::{ + fee_tracker::{ + port::l1::Fees, + service::{Config as FeeTrackerConfig, FeeThresholds, SmaPeriods}, + }, types::{L1Tx, NonEmpty}, Result, Runner, StateCommitter, StateCommitterConfig, }; use std::time::Duration; +use test_helpers::{noop_fee_tracker, preconfigured_fee_tracker}; #[tokio::test] async fn submits_fragments_when_required_count_accumulated() -> Result<()> { @@ -12,7 +17,7 @@ async fn submits_fragments_when_required_count_accumulated() -> Result<()> { let fragments = setup.insert_fragments(0, 4).await; let tx_hash = [0; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), L1Tx { hash: tx_hash, @@ -20,6 +25,9 @@ async fn submits_fragments_when_required_count_accumulated() -> Result<()> { ..Default::default() }, )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); let fuel_mock = test_helpers::mocks::fuel::latest_height_is(0); let mut state_committer = StateCommitter::new( @@ -33,6 +41,7 @@ async fn submits_fragments_when_required_count_accumulated() -> Result<()> { ..Default::default() }, setup.test_clock(), + noop_fee_tracker(), ); // when @@ -52,7 +61,7 @@ async fn submits_fragments_on_timeout_before_accumulation() -> Result<()> { let fragments = setup.insert_fragments(0, 5).await; // Only 5 fragments, less than required let tx_hash = [1; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), L1Tx { hash: tx_hash, @@ -61,6 +70,9 @@ async fn submits_fragments_on_timeout_before_accumulation() -> Result<()> { }, )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); let fuel_mock = test_helpers::mocks::fuel::latest_height_is(0); let mut state_committer = StateCommitter::new( l1_mock_submit, @@ -73,6 +85,7 @@ async fn submits_fragments_on_timeout_before_accumulation() -> Result<()> { ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Advance time beyond the timeout @@ -108,6 +121,7 @@ async fn does_not_submit_fragments_before_required_count_or_timeout() -> Result< ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Advance time less than the timeout @@ -129,7 +143,7 @@ async fn submits_fragments_when_required_count_before_timeout() -> Result<()> { let fragments = setup.insert_fragments(0, 5).await; let tx_hash = [3; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments).unwrap()), L1Tx { hash: tx_hash, @@ -137,6 +151,9 @@ async fn submits_fragments_when_required_count_before_timeout() -> Result<()> { ..Default::default() }, )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); let fuel_mock = test_helpers::mocks::fuel::latest_height_is(0); let mut state_committer = StateCommitter::new( @@ -150,6 +167,7 @@ async fn submits_fragments_when_required_count_before_timeout() -> Result<()> { ..Default::default() }, setup.test_clock(), + noop_fee_tracker(), ); // when @@ -172,7 +190,7 @@ async fn timeout_measured_from_last_finalized_fragment() -> Result<()> { let fragments_to_submit = setup.insert_fragments(1, 2).await; let tx_hash = [4; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments_to_submit).unwrap()), L1Tx { hash: tx_hash, @@ -180,6 +198,9 @@ async fn timeout_measured_from_last_finalized_fragment() -> Result<()> { ..Default::default() }, )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(1) })); let fuel_mock = test_helpers::mocks::fuel::latest_height_is(1); let mut state_committer = StateCommitter::new( @@ -193,6 +214,7 @@ async fn timeout_measured_from_last_finalized_fragment() -> Result<()> { ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Advance time to exceed the timeout since last finalized fragment @@ -215,7 +237,7 @@ async fn timeout_measured_from_startup_if_no_finalized_fragment() -> Result<()> let fragments = setup.insert_fragments(0, 5).await; // Only 5 fragments, less than required let tx_hash = [5; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), L1Tx { hash: tx_hash, @@ -225,6 +247,9 @@ async fn timeout_measured_from_startup_if_no_finalized_fragment() -> Result<()> )]); let fuel_mock = test_helpers::mocks::fuel::latest_height_is(0); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(1) })); let mut state_committer = StateCommitter::new( l1_mock_submit, fuel_mock, @@ -236,6 +261,7 @@ async fn timeout_measured_from_startup_if_no_finalized_fragment() -> Result<()> ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Advance time beyond the timeout from startup @@ -259,7 +285,7 @@ async fn resubmits_fragments_when_gas_bump_timeout_exceeded() -> Result<()> { let tx_hash_1 = [6; 32]; let tx_hash_2 = [7; 32]; - let l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([ + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([ ( Some(NonEmpty::from_vec(fragments.clone()).unwrap()), L1Tx { @@ -278,6 +304,10 @@ async fn resubmits_fragments_when_gas_bump_timeout_exceeded() -> Result<()> { ), ]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); + let fuel_mock = test_helpers::mocks::fuel::latest_height_is(0); let mut state_committer = StateCommitter::new( l1_mock_submit, @@ -288,9 +318,9 @@ async fn resubmits_fragments_when_gas_bump_timeout_exceeded() -> Result<()> { fragment_accumulation_timeout: Duration::from_secs(60), fragments_to_accumulate: 5.try_into().unwrap(), gas_bump_timeout: Duration::from_secs(60), - ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Submit the initial fragments @@ -307,3 +337,429 @@ async fn resubmits_fragments_when_gas_bump_timeout_exceeded() -> Result<()> { // Mocks validate that the fragments have been sent again Ok(()) } + +#[tokio::test] +async fn sends_transaction_when_short_term_fee_favorable() -> Result<()> { + // Given + let setup = test_helpers::Setup::init().await; + + let fee_sequence = vec![ + ( + 0, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 1, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 2, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ( + 3, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ( + 4, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ( + 5, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ]; + + let fee_tracker_config = services::fee_tracker::service::Config { + sma_periods: SmaPeriods { + short: 2.try_into().unwrap(), + long: 6.try_into().unwrap(), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }; + + // Insert enough fragments to meet the accumulation threshold + let fragments = setup.insert_fragments(0, 6).await; + + // Expect a state submission + let tx_hash = [0; 32]; + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + Some(NonEmpty::from_vec(fragments.clone()).unwrap()), + L1Tx { + hash: tx_hash, + nonce: 0, + ..Default::default() + }, + )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(5) })); + + let fuel_mock = test_helpers::mocks::fuel::latest_height_is(6); + let mut state_committer = StateCommitter::new( + l1_mock_submit, + fuel_mock, + setup.db(), + StateCommitterConfig { + lookback_window: 1000, + fragment_accumulation_timeout: Duration::from_secs(60), + fragments_to_accumulate: 6.try_into().unwrap(), + ..Default::default() + }, + setup.test_clock(), + preconfigured_fee_tracker(fee_sequence, fee_tracker_config), + ); + + // When + state_committer.run().await?; + + // Then + // Mocks validate that the fragments have been sent + Ok(()) +} + +#[tokio::test] +async fn does_not_send_transaction_when_short_term_fee_unfavorable() -> Result<()> { + // given + let setup = test_helpers::Setup::init().await; + + // Define fee sequence: last 2 blocks have higher fees than the long-term average + let fee_sequence = vec![ + ( + 0, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ( + 1, + Fees { + base_fee_per_gas: 3000, + reward: 3000, + base_fee_per_blob_gas: 3000, + }, + ), + ( + 2, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 3, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 4, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 5, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ]; + + let fee_tracker_config = FeeTrackerConfig { + sma_periods: SmaPeriods { + short: 2.try_into().unwrap(), + long: 6.try_into().unwrap(), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }; + + // Insert enough fragments to meet the accumulation threshold + let _fragments = setup.insert_fragments(0, 6).await; + + let mut l1_mock = test_helpers::mocks::l1::expects_state_submissions([]); + l1_mock + .expect_current_height() + .returning(|| Box::pin(async { Ok(5) })); + + let fuel_mock = test_helpers::mocks::fuel::latest_height_is(6); + let mut state_committer = StateCommitter::new( + l1_mock, + fuel_mock, + setup.db(), + StateCommitterConfig { + lookback_window: 1000, + fragment_accumulation_timeout: Duration::from_secs(60), + fragments_to_accumulate: 6.try_into().unwrap(), + ..Default::default() + }, + setup.test_clock(), + preconfigured_fee_tracker(fee_sequence, fee_tracker_config), + ); + + // when + state_committer.run().await?; + + // then + // Mocks validate that no fragments have been sent + Ok(()) +} + +#[tokio::test] +async fn sends_transaction_when_l2_blocks_behind_exceeds_max() -> Result<()> { + // given + let setup = test_helpers::Setup::init().await; + + // Define fee sequence with high fees to ensure that without the behind condition, it wouldn't send + let fee_sequence = vec![ + ( + 0, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ( + 1, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ( + 2, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ( + 3, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ( + 4, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ( + 5, + Fees { + base_fee_per_gas: 7000, + reward: 7000, + base_fee_per_blob_gas: 7000, + }, + ), + ]; + + let fee_tracker_config = FeeTrackerConfig { + sma_periods: SmaPeriods { + short: 2.try_into().unwrap(), + long: 6.try_into().unwrap(), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 50.try_into().unwrap(), + always_acceptable_fee: 0, + ..Default::default() + }, + }; + + // Insert enough fragments to meet the accumulation threshold + let fragments = setup.insert_fragments(0, 6).await; + + // Expect a state submission despite high fees because blocks behind exceed max + let tx_hash = [0; 32]; + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + Some(NonEmpty::from_vec(fragments.clone()).unwrap()), + L1Tx { + hash: tx_hash, + nonce: 0, + ..Default::default() + }, + )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(5) })); + + let fuel_mock = test_helpers::mocks::fuel::latest_height_is(50); // L2 height is 50, behind by 50 + let mut state_committer = StateCommitter::new( + l1_mock_submit, + fuel_mock, + setup.db(), + StateCommitterConfig { + lookback_window: 1000, + fragment_accumulation_timeout: Duration::from_secs(60), + fragments_to_accumulate: 6.try_into().unwrap(), + ..Default::default() + }, + setup.test_clock(), + preconfigured_fee_tracker(fee_sequence, fee_tracker_config), + ); + + // when + state_committer.run().await?; + + // then + // Mocks validate that the fragments have been sent despite high fees + Ok(()) +} + +#[tokio::test] +async fn sends_transaction_when_nearing_max_blocks_behind_with_increased_tolerance() -> Result<()> { + // given + let setup = test_helpers::Setup::init().await; + + let fee_sequence = vec![ + ( + 95, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 96, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 97, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 98, + Fees { + base_fee_per_gas: 5000, + reward: 5000, + base_fee_per_blob_gas: 5000, + }, + ), + ( + 99, + Fees { + base_fee_per_gas: 5800, + reward: 5800, + base_fee_per_blob_gas: 5800, + }, + ), + ( + 100, + Fees { + base_fee_per_gas: 5800, + reward: 5800, + base_fee_per_blob_gas: 5800, + }, + ), + ]; + + let fee_tracker_config = services::fee_tracker::service::Config { + sma_periods: SmaPeriods { + short: 2.try_into().unwrap(), + long: 5.try_into().unwrap(), + }, + fee_thresholds: FeeThresholds { + max_l2_blocks_behind: 100.try_into().unwrap(), + start_discount_percentage: 0.20.try_into().unwrap(), + end_premium_percentage: 0.20.try_into().unwrap(), + always_acceptable_fee: 0, + }, + }; + + let fragments = setup.insert_fragments(0, 6).await; + + // Expect a state submission due to nearing max blocks behind and increased tolerance + let tx_hash = [0; 32]; + let mut l1_mock_submit = test_helpers::mocks::l1::expects_state_submissions([( + Some(NonEmpty::from_vec(fragments.clone()).unwrap()), + L1Tx { + hash: tx_hash, + nonce: 0, + ..Default::default() + }, + )]); + l1_mock_submit + .expect_current_height() + .returning(|| Box::pin(async { Ok(100) })); + + let fuel_mock = test_helpers::mocks::fuel::latest_height_is(80); + + let mut state_committer = StateCommitter::new( + l1_mock_submit, + fuel_mock, + setup.db(), + StateCommitterConfig { + lookback_window: 1000, + fragment_accumulation_timeout: Duration::from_secs(60), + fragments_to_accumulate: 6.try_into().unwrap(), + ..Default::default() + }, + setup.test_clock(), + preconfigured_fee_tracker(fee_sequence, fee_tracker_config), + ); + + // when + state_committer.run().await?; + + // then + // Mocks validate that the fragments have been sent due to increased tolerance from nearing max blocks behind + Ok(()) +} diff --git a/packages/services/tests/state_listener.rs b/packages/services/tests/state_listener.rs index b60460cc..5777b8da 100644 --- a/packages/services/tests/state_listener.rs +++ b/packages/services/tests/state_listener.rs @@ -8,7 +8,10 @@ use services::{ Result, Runner, StateCommitter, StateCommitterConfig, }; use test_case::test_case; -use test_helpers::mocks::{self, l1::TxStatus}; +use test_helpers::{ + mocks::{self, l1::TxStatus}, + noop_fee_tracker, +}; #[tokio::test] async fn successful_finalized_tx() -> Result<()> { @@ -438,8 +441,14 @@ async fn block_inclusion_of_replacement_leaves_no_pending_txs() -> Result<()> { nonce, ..Default::default() }; + let mut l1_mock = + mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]); + l1_mock + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); + let mut committer = StateCommitter::new( - mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]), + l1_mock, mocks::fuel::latest_height_is(0), setup.db(), StateCommitterConfig { @@ -447,6 +456,7 @@ async fn block_inclusion_of_replacement_leaves_no_pending_txs() -> Result<()> { ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Orig tx @@ -535,8 +545,14 @@ async fn finalized_replacement_tx_will_leave_no_pending_tx( ..Default::default() }; + let mut l1_mock = + mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]); + l1_mock + .expect_current_height() + .returning(|| Box::pin(async { Ok(0) })); + let mut committer = StateCommitter::new( - mocks::l1::expects_state_submissions(vec![(None, orig_tx), (None, replacement_tx)]), + l1_mock, mocks::fuel::latest_height_is(0), setup.db(), crate::StateCommitterConfig { @@ -544,6 +560,7 @@ async fn finalized_replacement_tx_will_leave_no_pending_tx( ..Default::default() }, test_clock.clone(), + noop_fee_tracker(), ); // Orig tx diff --git a/packages/test-helpers/src/lib.rs b/packages/test-helpers/src/lib.rs index f73fc475..2e0f6a72 100644 --- a/packages/test-helpers/src/lib.rs +++ b/packages/test-helpers/src/lib.rs @@ -8,6 +8,9 @@ use fuel_block_committer_encoding::bundle::{self, CompressionLevel}; use metrics::prometheus::IntGauge; use mocks::l1::TxStatus; use rand::{Rng, RngCore}; +use services::fee_tracker::port::l1::testing::{ConstantFeeApi, PreconfiguredFeeApi}; +use services::fee_tracker::port::l1::Fees; +use services::fee_tracker::service::FeeTracker; use services::types::{ BlockSubmission, CollectNonEmpty, CompressedFuelBlock, Fragment, L1Tx, NonEmpty, }; @@ -484,6 +487,17 @@ pub mod mocks { } } +pub fn noop_fee_tracker() -> FeeTracker { + FeeTracker::new(ConstantFeeApi::new(Fees::default()), Default::default()) +} + +pub fn preconfigured_fee_tracker( + fee_sequence: impl IntoIterator, + config: services::fee_tracker::service::Config, +) -> FeeTracker { + FeeTracker::new(PreconfiguredFeeApi::new(fee_sequence), config) +} + pub struct Setup { db: DbWithProcess, test_clock: TestClock, @@ -531,15 +545,20 @@ impl Setup { } pub async fn send_fragments(&self, eth_tx: [u8; 32], eth_nonce: u32) { + let mut l1_mock = mocks::l1::expects_state_submissions(vec![( + None, + L1Tx { + hash: eth_tx, + nonce: eth_nonce, + ..Default::default() + }, + )]); + l1_mock + .expect_current_height() + .return_once(move || Box::pin(async { Ok(0) })); + StateCommitter::new( - mocks::l1::expects_state_submissions(vec![( - None, - L1Tx { - hash: eth_tx, - nonce: eth_nonce, - ..Default::default() - }, - )]), + l1_mock, mocks::fuel::latest_height_is(0), self.db(), services::StateCommitterConfig { @@ -547,9 +566,10 @@ impl Setup { fragment_accumulation_timeout: Duration::from_secs(0), fragments_to_accumulate: 1.try_into().unwrap(), gas_bump_timeout: Duration::from_secs(300), - tx_max_fee: 1_000_000_000, + ..Default::default() }, self.test_clock.clone(), + noop_fee_tracker(), ) .run() .await @@ -563,7 +583,7 @@ impl Setup { pub async fn commit_block_bundle(&self, eth_tx: [u8; 32], eth_nonce: u32, height: u32) { self.insert_fragments(height, 6).await; - let l1_mock = mocks::l1::expects_state_submissions(vec![( + let mut l1_mock = mocks::l1::expects_state_submissions(vec![( None, L1Tx { hash: eth_tx, @@ -571,6 +591,10 @@ impl Setup { ..Default::default() }, )]); + l1_mock + .expect_current_height() + .return_once(move || Box::pin(async { Ok(0) })); + let fuel_mock = mocks::fuel::latest_height_is(height); let mut committer = StateCommitter::new( l1_mock, @@ -581,9 +605,9 @@ impl Setup { fragment_accumulation_timeout: Duration::from_secs(0), fragments_to_accumulate: 1.try_into().unwrap(), gas_bump_timeout: Duration::from_secs(300), - tx_max_fee: 1_000_000_000, }, self.test_clock.clone(), + noop_fee_tracker(), ); committer.run().await.unwrap(); diff --git a/run_tests.sh b/run_tests.sh index 3280743e..9c074973 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -9,4 +9,4 @@ cargo test --manifest-path "$workspace_cargo_manifest" --workspace --exclude e2e # So that we may have a binary in `target/release` cargo build --release --manifest-path "$workspace_cargo_manifest" --bin fuel-block-committer -PATH="$script_location/target/release:$PATH" cargo test --manifest-path "$workspace_cargo_manifest" --package e2e -- --test-threads=1 +PATH="$script_location/target/release:$PATH" cargo test --manifest-path "$workspace_cargo_manifest" --package e2e -- --test-threads=1 --nocapture