diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a6b2d87a9..372668f9554 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2485](https://github.com/FuelLabs/fuel-core/pull/2485): Hardcode the timestamp of the genesis block and version of `tai64` to avoid breaking changes for us. ### Changed +- [2468](https://github.com/FuelLabs/fuel-core/pull/2468): Abstract unrecorded blocks concept for V1 algorithm, create new storage impl. Introduce `TransactionableStorage` trait to allow atomic changes to the storage. - [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB. - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. - [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 4fd45534b24..03003319235 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -182,7 +182,7 @@ pub fn init_sub_services( let genesis_block_height = *genesis_block.header().height(); let settings = consensus_parameters_provider.clone(); let block_stream = importer_adapter.events_shared_result(); - let metadata = StructuredStorage::new(database.gas_price().clone()); + let metadata = database.gas_price().clone(); let gas_price_service_v0 = new_gas_price_service_v0( config.clone().into(), @@ -190,7 +190,7 @@ pub fn init_sub_services( settings, block_stream, database.gas_price().clone(), - metadata, + StructuredStorage::new(metadata), database.on_chain().clone(), )?; diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index 2464f196a09..ac670d7b38f 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -19,6 +19,10 @@ pub enum Error { FailedToIncludeL2BlockData(String), #[error("L2 block expected but not found in unrecorded blocks: {height}")] L2BlockExpectedNotFound { height: u32 }, + #[error("Could not insert unrecorded block: {0}")] + CouldNotInsertUnrecordedBlock(String), + #[error("Could not remove unrecorded block: {0}")] + CouldNotRemoveUnrecordedBlock(String), } // TODO: separate exec gas price and DA gas price into newtypes for clarity @@ -59,6 +63,27 @@ impl AlgorithmV1 { } } +pub type Height = u32; +pub type Bytes = u64; + +pub trait UnrecordedBlocks { + fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), String>; + + fn remove(&mut self, height: &Height) -> Result, String>; +} + +impl UnrecordedBlocks for BTreeMap { + fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), String> { + self.insert(height, bytes); + Ok(()) + } + + fn remove(&mut self, height: &Height) -> Result, String> { + let value = self.remove(height); + Ok(value) + } +} + /// The state of the algorithm used to update the gas price algorithm for each block /// /// Because there will always be a delay between blocks submitted to the L2 chain and the blocks @@ -96,8 +121,6 @@ impl AlgorithmV1 { /// The DA portion also uses a moving average of the profits over the last `avg_window` blocks /// instead of the actual profit. Setting the `avg_window` to 1 will effectively disable the /// moving average. -type Height = u32; -type Bytes = u64; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub struct AlgorithmUpdaterV1 { // Execution @@ -143,8 +166,6 @@ pub struct AlgorithmUpdaterV1 { pub latest_da_cost_per_byte: u128, /// Activity of L2 pub l2_activity: L2ActivityTracker, - /// The unrecorded blocks that are used to calculate the projected cost of recording blocks - pub unrecorded_blocks: BTreeMap, /// Total unrecorded block bytes pub unrecorded_blocks_bytes: u128, } @@ -269,10 +290,28 @@ impl L2ActivityTracker { pub fn current_activity(&self) -> u16 { self.chain_activity } + + pub fn max_activity(&self) -> u16 { + self.max_activity + } + + pub fn capped_activity_threshold(&self) -> u16 { + self.capped_activity_threshold + } + + pub fn decrease_activity_threshold(&self) -> u16 { + self.decrease_activity_threshold + } + + pub fn block_activity_threshold(&self) -> ClampedPercentage { + self.block_activity_threshold + } } /// A value that represents a value between 0 and 100. Higher values are clamped to 100 -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, PartialOrd)] +#[derive( + serde::Serialize, serde::Deserialize, Debug, Copy, Clone, PartialEq, PartialOrd, +)] pub struct ClampedPercentage { value: u8, } @@ -300,27 +339,34 @@ impl core::ops::Deref for ClampedPercentage { } impl AlgorithmUpdaterV1 { - pub fn update_da_record_data( + pub fn update_da_record_data( &mut self, heights: &[u32], recorded_bytes: u32, recording_cost: u128, + unrecorded_blocks: &mut U, ) -> Result<(), Error> { if !heights.is_empty() { - self.da_block_update(heights, recorded_bytes as u128, recording_cost)?; + self.da_block_update( + heights, + recorded_bytes as u128, + recording_cost, + unrecorded_blocks, + )?; self.recalculate_projected_cost(); self.update_da_gas_price(); } Ok(()) } - pub fn update_l2_block_data( + pub fn update_l2_block_data( &mut self, height: u32, used: u64, capacity: NonZeroU64, block_bytes: u64, fee_wei: u128, + unrecorded_blocks: &mut U, ) -> Result<(), Error> { let expected = self.l2_block_height.saturating_add(1); if height != expected { @@ -351,7 +397,9 @@ impl AlgorithmUpdaterV1 { self.update_da_gas_price(); // metadata - self.unrecorded_blocks.insert(height, block_bytes); + unrecorded_blocks + .insert(height, block_bytes) + .map_err(Error::CouldNotInsertUnrecordedBlock)?; self.unrecorded_blocks_bytes = self .unrecorded_blocks_bytes .saturating_add(block_bytes as u128); @@ -512,13 +560,14 @@ impl AlgorithmUpdaterV1 { .saturating_div(100) } - fn da_block_update( + fn da_block_update( &mut self, heights: &[u32], recorded_bytes: u128, recording_cost: u128, + unrecorded_blocks: &mut U, ) -> Result<(), Error> { - self.update_unrecorded_block_bytes(heights); + self.update_unrecorded_block_bytes(heights, unrecorded_blocks)?; let new_da_block_cost = self .latest_known_total_da_cost_excess @@ -540,10 +589,17 @@ impl AlgorithmUpdaterV1 { // Get the bytes for all specified heights, or get none of them. // Always remove the blocks from the unrecorded blocks so they don't build up indefinitely - fn update_unrecorded_block_bytes(&mut self, heights: &[u32]) { + fn update_unrecorded_block_bytes( + &mut self, + heights: &[u32], + unrecorded_blocks: &mut U, + ) -> Result<(), Error> { let mut total: u128 = 0; for expected_height in heights { - let maybe_bytes = self.unrecorded_blocks.remove(expected_height); + let maybe_bytes = unrecorded_blocks + .remove(expected_height) + .map_err(Error::CouldNotRemoveUnrecordedBlock)?; + if let Some(bytes) = maybe_bytes { total = total.saturating_add(bytes as u128); } else { @@ -554,6 +610,8 @@ impl AlgorithmUpdaterV1 { } } self.unrecorded_blocks_bytes = self.unrecorded_blocks_bytes.saturating_sub(total); + + Ok(()) } fn recalculate_projected_cost(&mut self) { diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests.rs index 9e3dc879651..c2dcaba36fa 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests.rs @@ -4,8 +4,11 @@ use crate::v1::{ AlgorithmUpdaterV1, + Bytes, + Height, L2ActivityTracker, }; +use std::collections::BTreeMap; #[cfg(test)] mod algorithm_v1_tests; @@ -38,7 +41,7 @@ pub struct UpdaterBuilder { da_cost_per_byte: u128, project_total_cost: u128, latest_known_total_cost: u128, - unrecorded_blocks: Vec, + unrecorded_blocks_bytes: u64, last_profit: i128, second_to_last_profit: i128, da_gas_price_factor: u64, @@ -65,7 +68,7 @@ impl UpdaterBuilder { da_cost_per_byte: 0, project_total_cost: 0, latest_known_total_cost: 0, - unrecorded_blocks: vec![], + unrecorded_blocks_bytes: 0, last_profit: 0, second_to_last_profit: 0, da_gas_price_factor: 1, @@ -146,8 +149,14 @@ impl UpdaterBuilder { self } - fn with_unrecorded_blocks(mut self, unrecorded_blocks: Vec) -> Self { - self.unrecorded_blocks = unrecorded_blocks; + fn with_unrecorded_blocks( + mut self, + unrecorded_blocks: &BTreeMap, + ) -> Self { + let unrecorded_block_bytes = unrecorded_blocks + .iter() + .fold(0u64, |acc, (_, bytes)| acc + bytes); + self.unrecorded_blocks_bytes = unrecorded_block_bytes; self } @@ -180,11 +189,6 @@ impl UpdaterBuilder { latest_da_cost_per_byte: self.da_cost_per_byte, projected_total_da_cost: self.project_total_cost, latest_known_total_da_cost_excess: self.latest_known_total_cost, - unrecorded_blocks: self - .unrecorded_blocks - .iter() - .map(|b| (b.height, b.block_bytes)) - .collect(), last_profit: self.last_profit, second_to_last_profit: self.second_to_last_profit, min_da_gas_price: self.min_da_gas_price, @@ -193,10 +197,7 @@ impl UpdaterBuilder { .try_into() .expect("Should never be non-zero"), l2_activity: self.l2_activity, - unrecorded_blocks_bytes: self - .unrecorded_blocks - .iter() - .fold(0u128, |acc, b| acc + u128::from(b.block_bytes)), + unrecorded_blocks_bytes: self.unrecorded_blocks_bytes as u128, } } } diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs index 4b87b0d4ad0..6d1feb220b4 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs @@ -1,7 +1,5 @@ -use crate::v1::tests::{ - BlockBytes, - UpdaterBuilder, -}; +use crate::v1::tests::UpdaterBuilder; +use std::collections::BTreeMap; #[test] fn update_da_record_data__if_receives_batch_with_unknown_blocks_will_include_known_blocks_with_previous_cost( @@ -11,14 +9,11 @@ fn update_da_record_data__if_receives_batch_with_unknown_blocks_will_include_kno let recorded_cost = 1_000_000; let recorded_bytes = 500; let block_bytes = 1000; - let unrecorded_blocks = vec![BlockBytes { - height: 1, - block_bytes, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(1, block_bytes)].into_iter().collect(); let cost_per_byte = 333; let known_total_cost = 10_000; let mut updater = UpdaterBuilder::new() - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .with_da_cost_per_byte(cost_per_byte) .with_known_total_cost(known_total_cost) .build(); @@ -26,7 +21,12 @@ fn update_da_record_data__if_receives_batch_with_unknown_blocks_will_include_kno // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -45,14 +45,11 @@ fn update_da_record_data__if_receives_batch_with_unknown_blocks_will_never_incre let recorded_cost = 200; let block_bytes = 1000; let recorded_bytes = 500; - let unrecorded_blocks = vec![BlockBytes { - height: 1, - block_bytes, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(1, block_bytes)].into_iter().collect(); let cost_per_byte = 333; let known_total_cost = 10_000; let mut updater = UpdaterBuilder::new() - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .with_da_cost_per_byte(cost_per_byte) .with_known_total_cost(known_total_cost) .build(); @@ -60,7 +57,12 @@ fn update_da_record_data__if_receives_batch_with_unknown_blocks_will_never_incre // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -76,13 +78,10 @@ fn update_da_record_data__updates_cost_per_byte() { // given let da_cost_per_byte = 20; let block_bytes = 1000; - let unrecorded_blocks = vec![BlockBytes { - height: 1, - block_bytes, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(1, block_bytes)].into_iter().collect(); let mut updater = UpdaterBuilder::new() .with_da_cost_per_byte(da_cost_per_byte) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; @@ -91,7 +90,12 @@ fn update_da_record_data__updates_cost_per_byte() { let recorded_heights: Vec = (1u32..2).collect(); // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -107,26 +111,14 @@ fn update_da_record_data__updates_known_total_cost() { let l2_block_height = 15; let projected_total_cost = 2000; let known_total_cost = 1500; - let unrecorded_blocks = vec![ - BlockBytes { - height: 11, - block_bytes: 1000, - }, - BlockBytes { - height: 12, - block_bytes: 2000, - }, - BlockBytes { - height: 13, - block_bytes: 1500, - }, - ]; + let mut unrecorded_blocks: BTreeMap<_, _> = + [(11, 1000), (12, 2000), (13, 1500)].into_iter().collect(); let mut updater = UpdaterBuilder::new() .with_da_cost_per_byte(da_cost_per_byte) .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost) .with_known_total_cost(known_total_cost) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let recorded_heights: Vec = (11u32..14).collect(); @@ -134,7 +126,12 @@ fn update_da_record_data__updates_known_total_cost() { let recorded_cost = 300; // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -149,24 +146,12 @@ fn update_da_record_data__if_da_height_matches_l2_height_projected_and_known_mat let da_cost_per_byte = 20; let l2_block_height = 13; let known_total_cost = 1500; - let unrecorded_blocks = vec![ - BlockBytes { - height: 11, - block_bytes: 1000, - }, - BlockBytes { - height: 12, - block_bytes: 2000, - }, - BlockBytes { - height: 13, - block_bytes: 1500, - }, - ]; + let mut unrecorded_blocks: BTreeMap<_, _> = + [(11, 1000), (12, 2000), (13, 1500)].into_iter().collect(); let guessed_cost: u64 = unrecorded_blocks - .iter() - .map(|block| block.block_bytes * da_cost_per_byte) + .values() + .map(|bytes| bytes * da_cost_per_byte) .sum(); let projected_total_cost = known_total_cost + guessed_cost; let mut updater = UpdaterBuilder::new() @@ -174,7 +159,7 @@ fn update_da_record_data__if_da_height_matches_l2_height_projected_and_known_mat .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost as u128) .with_known_total_cost(known_total_cost as u128) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let block_bytes = 1000; @@ -186,11 +171,16 @@ fn update_da_record_data__if_da_height_matches_l2_height_projected_and_known_mat let recorded_cost = block_cost * 3; // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then - assert_eq!(updater.unrecorded_blocks.len(), 0); + assert_eq!(unrecorded_blocks.len(), 0); assert_eq!( updater.projected_total_da_cost, updater.latest_known_total_da_cost_excess @@ -205,35 +195,16 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g let l2_block_height = 15; let original_known_total_cost: u128 = 1500; let block_bytes = 1000; - let mut unrecorded_blocks = vec![ - BlockBytes { - height: 11, - block_bytes: 1000, - }, - BlockBytes { - height: 12, - block_bytes: 2000, - }, - BlockBytes { - height: 13, - block_bytes: 1500, - }, - ]; + let remaining = vec![(14, block_bytes), (15, block_bytes)]; + let mut pairs = vec![(11, 1000), (12, 2000), (13, 1500)]; + + pairs.extend(remaining.clone()); + + let mut unrecorded_blocks: BTreeMap<_, _> = pairs.into_iter().collect(); - let remaining = vec![ - BlockBytes { - height: 14, - block_bytes, - }, - BlockBytes { - height: 15, - block_bytes, - }, - ]; - unrecorded_blocks.extend(remaining.clone()); let guessed_cost: u128 = unrecorded_blocks - .iter() - .map(|block| block.block_bytes as u128 * da_cost_per_byte) + .values() + .map(|bytes| *bytes as u128 * da_cost_per_byte) .sum(); let projected_total_cost: u128 = original_known_total_cost + guessed_cost; let mut updater = UpdaterBuilder::new() @@ -241,7 +212,7 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost) .with_known_total_cost(original_known_total_cost) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; @@ -252,7 +223,12 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -260,7 +236,7 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g let new_known_total_cost = original_known_total_cost + recorded_cost; let guessed_part: u128 = remaining .iter() - .map(|block| block.block_bytes as u128 * new_cost_per_byte) + .map(|(_, bytes)| *bytes as u128 * new_cost_per_byte) .sum(); let expected = new_known_total_cost + guessed_part; assert_eq!(actual, expected); @@ -271,27 +247,17 @@ fn update_da_record_data__updates_known_total_cost_if_blocks_are_out_of_order() // given let da_cost_per_byte = 20; let block_bytes = 1000; - let unrecorded_blocks = vec![ - BlockBytes { - height: 1, - block_bytes, - }, - BlockBytes { - height: 2, - block_bytes, - }, - BlockBytes { - height: 3, - block_bytes, - }, - ]; + let mut unrecorded_blocks: BTreeMap<_, _> = + [(1, block_bytes), (2, block_bytes), (3, block_bytes)] + .into_iter() + .collect(); let old_known_total_cost = 500; let old_projected_total_cost = old_known_total_cost + (block_bytes as u128 * da_cost_per_byte * 3); let old_da_cost_per_byte = 20; let mut updater = UpdaterBuilder::new() .with_da_cost_per_byte(da_cost_per_byte) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .with_da_cost_per_byte(old_da_cost_per_byte) .with_known_total_cost(old_known_total_cost) .with_projected_total_cost(old_projected_total_cost) @@ -303,7 +269,12 @@ fn update_da_record_data__updates_known_total_cost_if_blocks_are_out_of_order() // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost as u128) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost as u128, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -318,27 +289,17 @@ fn update_da_record_data__updates_projected_total_cost_if_blocks_are_out_of_orde // given let da_cost_per_byte = 20; let block_bytes = 1000; - let unrecorded_blocks = vec![ - BlockBytes { - height: 1, - block_bytes, - }, - BlockBytes { - height: 2, - block_bytes, - }, - BlockBytes { - height: 3, - block_bytes, - }, - ]; + let mut unrecorded_blocks: BTreeMap<_, _> = + [(1, block_bytes), (2, block_bytes), (3, block_bytes)] + .into_iter() + .collect(); let old_known_total_cost = 500; let old_projected_total_cost = old_known_total_cost + (block_bytes as u128 * da_cost_per_byte * 3); let old_da_cost_per_byte = 20; let mut updater = UpdaterBuilder::new() .with_da_cost_per_byte(da_cost_per_byte) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .with_da_cost_per_byte(old_da_cost_per_byte) .with_known_total_cost(old_known_total_cost) .with_projected_total_cost(old_projected_total_cost) @@ -350,7 +311,12 @@ fn update_da_record_data__updates_projected_total_cost_if_blocks_are_out_of_orde // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost as u128) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost as u128, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -365,23 +331,13 @@ fn update_da_record_data__updates_unrecorded_blocks() { // given let da_cost_per_byte = 20; let block_bytes = 1000; - let unrecorded_blocks = vec![ - BlockBytes { - height: 1, - block_bytes, - }, - BlockBytes { - height: 2, - block_bytes, - }, - BlockBytes { - height: 3, - block_bytes, - }, - ]; + let mut unrecorded_blocks: BTreeMap<_, _> = + [(1, block_bytes), (2, block_bytes), (3, block_bytes)] + .into_iter() + .collect(); let mut updater = UpdaterBuilder::new() .with_da_cost_per_byte(da_cost_per_byte) - .with_unrecorded_blocks(unrecorded_blocks) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; let recorded_bytes = 500; @@ -390,12 +346,17 @@ fn update_da_record_data__updates_unrecorded_blocks() { // when updater - .update_da_record_data(&recorded_heights, recorded_bytes, recorded_cost) + .update_da_record_data( + &recorded_heights, + recorded_bytes, + recorded_cost, + &mut unrecorded_blocks, + ) .unwrap(); // then let expected = vec![(1, block_bytes)]; - let actual: Vec<_> = updater.unrecorded_blocks.into_iter().collect(); + let actual: Vec<_> = unrecorded_blocks.into_iter().collect(); assert_eq!(actual, expected); } @@ -405,14 +366,11 @@ fn update_da_record_data__da_block_lowers_da_gas_price() { let da_cost_per_byte = 40; let l2_block_height = 11; let original_known_total_cost = 150; - let unrecorded_blocks = vec![BlockBytes { - height: 11, - block_bytes: 3000, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(11, 3000)].into_iter().collect(); let da_p_component = 2; let guessed_cost: u64 = unrecorded_blocks - .iter() - .map(|block| block.block_bytes * da_cost_per_byte) + .values() + .map(|bytes| bytes * da_cost_per_byte) .sum(); let projected_total_cost = original_known_total_cost + guessed_cost; @@ -423,19 +381,19 @@ fn update_da_record_data__da_block_lowers_da_gas_price() { .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost as u128) .with_known_total_cost(original_known_total_cost as u128) - .with_unrecorded_blocks(unrecorded_blocks.clone()) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; - let (recorded_heights, recorded_cost) = - unrecorded_blocks - .iter() - .fold((vec![], 0), |(mut range, cost), block| { - range.push(block.height); - (range, cost + block.block_bytes * new_cost_per_byte) - }); - let min = recorded_heights.iter().min().unwrap(); - let max = recorded_heights.iter().max().unwrap(); + let (recorded_heights, recorded_cost) = unrecorded_blocks.iter().fold( + (vec![], 0), + |(mut range, cost), (height, bytes)| { + range.push(height); + (range, cost + bytes * new_cost_per_byte) + }, + ); + let min = *recorded_heights.iter().min().unwrap(); + let max = *recorded_heights.iter().max().unwrap(); let recorded_range: Vec = (*min..(max + 1)).collect(); let recorded_bytes = 500; @@ -443,7 +401,12 @@ fn update_da_record_data__da_block_lowers_da_gas_price() { // when updater - .update_da_record_data(&recorded_range, recorded_bytes, recorded_cost as u128) + .update_da_record_data( + &recorded_range, + recorded_bytes, + recorded_cost as u128, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -459,14 +422,11 @@ fn update_da_record_data__da_block_increases_da_gas_price() { let da_cost_per_byte = 40; let l2_block_height = 11; let original_known_total_cost = 150; - let unrecorded_blocks = vec![BlockBytes { - height: 11, - block_bytes: 3000, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(11, 3000)].into_iter().collect(); let da_p_component = 2; let guessed_cost: u64 = unrecorded_blocks - .iter() - .map(|block| block.block_bytes * da_cost_per_byte) + .values() + .map(|bytes| bytes * da_cost_per_byte) .sum(); let projected_total_cost = original_known_total_cost + guessed_cost; @@ -477,19 +437,20 @@ fn update_da_record_data__da_block_increases_da_gas_price() { .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost as u128) .with_known_total_cost(original_known_total_cost as u128) - .with_unrecorded_blocks(unrecorded_blocks.clone()) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; - let (recorded_heights, recorded_cost) = - unrecorded_blocks - .iter() - .fold((vec![], 0), |(mut range, cost), block| { - range.push(block.height); - (range, cost + block.block_bytes * new_cost_per_byte) - }); - let min = recorded_heights.iter().min().unwrap(); - let max = recorded_heights.iter().max().unwrap(); + let (recorded_heights, recorded_cost) = unrecorded_blocks.iter().fold( + (vec![], 0), + |(mut range, cost), (height, bytes)| { + range.push(height); + (range, cost + bytes * new_cost_per_byte) + }, + ); + + let min = *recorded_heights.iter().min().unwrap(); + let max = *recorded_heights.iter().max().unwrap(); let recorded_range: Vec = (*min..(max + 1)).collect(); let recorded_bytes = 500; @@ -497,7 +458,12 @@ fn update_da_record_data__da_block_increases_da_gas_price() { // when updater - .update_da_record_data(&recorded_range, recorded_bytes, recorded_cost as u128) + .update_da_record_data( + &recorded_range, + recorded_bytes, + recorded_cost as u128, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -513,14 +479,11 @@ fn update_da_record_data__da_block_will_not_change_da_gas_price() { let da_cost_per_byte = 40; let l2_block_height = 11; let original_known_total_cost = 150; - let unrecorded_blocks = vec![BlockBytes { - height: 11, - block_bytes: 3000, - }]; + let mut unrecorded_blocks: BTreeMap<_, _> = [(11, 3000)].into_iter().collect(); let da_p_component = 2; let guessed_cost: u64 = unrecorded_blocks - .iter() - .map(|block| block.block_bytes * da_cost_per_byte) + .values() + .map(|bytes| bytes * da_cost_per_byte) .sum(); let projected_total_cost = original_known_total_cost + guessed_cost; @@ -531,19 +494,19 @@ fn update_da_record_data__da_block_will_not_change_da_gas_price() { .with_l2_block_height(l2_block_height) .with_projected_total_cost(projected_total_cost as u128) .with_known_total_cost(original_known_total_cost as u128) - .with_unrecorded_blocks(unrecorded_blocks.clone()) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let new_cost_per_byte = 100; - let (recorded_heights, recorded_cost) = - unrecorded_blocks - .iter() - .fold((vec![], 0), |(mut range, cost), block| { - range.push(block.height); - (range, cost + block.block_bytes * new_cost_per_byte) - }); - let min = recorded_heights.iter().min().unwrap(); - let max = recorded_heights.iter().max().unwrap(); + let (recorded_heights, recorded_cost) = unrecorded_blocks.iter().fold( + (vec![], 0), + |(mut range, cost), (height, bytes)| { + range.push(height); + (range, cost + bytes * new_cost_per_byte) + }, + ); + let min = *recorded_heights.iter().min().unwrap(); + let max = *recorded_heights.iter().max().unwrap(); let recorded_range: Vec = (*min..(max + 1)).collect(); let recorded_bytes = 500; @@ -551,7 +514,12 @@ fn update_da_record_data__da_block_will_not_change_da_gas_price() { // when updater - .update_da_record_data(&recorded_range, recorded_bytes, recorded_cost as u128) + .update_da_record_data( + &recorded_range, + recorded_bytes, + recorded_cost as u128, + &mut unrecorded_blocks, + ) .unwrap(); // then diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs index 8ce3b6b5d4e..6f0dbc4debe 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs @@ -3,9 +3,12 @@ use crate::v1::{ BlockBytes, UpdaterBuilder, }, + Bytes, Error, + Height, L2ActivityTracker, }; +use std::collections::BTreeMap; fn decrease_l2_activity() -> L2ActivityTracker { let normal = 1; @@ -62,6 +65,10 @@ fn positive_profit_updater_builder() -> UpdaterBuilder { .with_exec_gas_price_change_percent(0) } +fn empty_unrecorded_blocks() -> BTreeMap { + BTreeMap::new() +} + #[test] fn update_l2_block_data__updates_l2_block() { // given @@ -79,7 +86,14 @@ fn update_l2_block_data__updates_l2_block() { // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data( + height, + used, + capacity, + block_bytes, + fee, + &mut empty_unrecorded_blocks(), + ) .unwrap(); // then @@ -104,7 +118,14 @@ fn update_l2_block_data__skipped_block_height_throws_error() { // when let actual_error = updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data( + height, + used, + capacity, + block_bytes, + fee, + &mut empty_unrecorded_blocks(), + ) .unwrap_err(); // then @@ -128,10 +149,11 @@ fn update_l2_block_data__updates_projected_cost() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 100; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -155,10 +177,18 @@ fn update_l2_block_data__updates_the_total_reward_value() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 10_000; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, gas_used, capacity, block_bytes, fee) + .update_l2_block_data( + height, + gas_used, + capacity, + block_bytes, + fee, + unrecorded_blocks, + ) .unwrap(); // then @@ -183,10 +213,11 @@ fn update_l2_block_data__even_threshold_will_not_change_exec_gas_price() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -212,10 +243,11 @@ fn update_l2_block_data__below_threshold_will_decrease_exec_gas_price() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -243,10 +275,11 @@ fn update_l2_block_data__above_threshold_will_increase_exec_gas_price() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -275,10 +308,11 @@ fn update_l2_block_data__exec_price_will_not_go_below_min() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -305,10 +339,11 @@ fn update_l2_block_data__updates_last_and_last_last_profit() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 0; // No fee so it's easier to calculate profit + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -326,6 +361,7 @@ fn update_l2_block_data__positive_profit_decrease_gas_price() { // given let mut updater = positive_profit_updater_builder().build(); let old_gas_price = updater.algorithm().calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let block_bytes = 500u64; @@ -336,6 +372,7 @@ fn update_l2_block_data__positive_profit_decrease_gas_price() { 100.try_into().unwrap(), block_bytes, 200, + unrecorded_blocks, ) .unwrap(); @@ -374,6 +411,7 @@ fn update_l2_block_data__price_does_not_decrease_more_than_max_percent() { .with_last_profit(last_profit, last_last_profit) .with_da_max_change_percent(max_da_change_percent) .build(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let height = updater.l2_block_height + 1; @@ -382,7 +420,7 @@ fn update_l2_block_data__price_does_not_decrease_more_than_max_percent() { let block_bytes = 1000; let fee = 200; updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -408,6 +446,7 @@ fn update_l2_block_data__da_price_does_not_increase_more_than_max_percent() { let last_last_profit = 0; let max_da_change_percent = 5; let large_starting_reward = 0; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); let mut updater = UpdaterBuilder::new() .with_starting_exec_gas_price(starting_exec_gas_price) .with_da_p_component(da_p_component) @@ -428,7 +467,7 @@ fn update_l2_block_data__da_price_does_not_increase_more_than_max_percent() { let block_bytes = 1000; let fee = 200; updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -454,6 +493,7 @@ fn update_l2_block_data__never_drops_below_minimum_da_gas_price() { let last_profit = i128::MAX; let avg_window = 10; let large_reward = u128::MAX; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); let mut updater = UpdaterBuilder::new() .with_starting_exec_gas_price(starting_exec_gas_price) .with_min_exec_gas_price(starting_exec_gas_price) @@ -477,6 +517,7 @@ fn update_l2_block_data__never_drops_below_minimum_da_gas_price() { 100.try_into().unwrap(), 1000, fee, + unrecorded_blocks, ) .unwrap(); @@ -497,6 +538,7 @@ fn update_l2_block_data__even_profit_maintains_price() { let da_gas_price_denominator = 1; let block_bytes = 500u64; let starting_reward = starting_cost; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); let mut updater = UpdaterBuilder::new() .with_starting_exec_gas_price(starting_exec_gas_price) .with_starting_da_gas_price(starting_da_gas_price) @@ -518,6 +560,7 @@ fn update_l2_block_data__even_profit_maintains_price() { 100.try_into().unwrap(), block_bytes, total_fee.into(), + unrecorded_blocks, ) .unwrap(); let algo = updater.algorithm(); @@ -534,6 +577,7 @@ fn update_l2_block_data__negative_profit_increase_gas_price() { let mut updater = negative_profit_updater_builder().build(); let algo = updater.algorithm(); let old_gas_price = algo.calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let height = updater.l2_block_height + 1; @@ -542,7 +586,7 @@ fn update_l2_block_data__negative_profit_increase_gas_price() { let block_bytes = 500u64; let fee = 0; updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -570,10 +614,18 @@ fn update_l2_block_data__adds_l2_block_to_unrecorded_blocks() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let new_gas_price = 100; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, new_gas_price) + .update_l2_block_data( + height, + used, + capacity, + block_bytes, + new_gas_price, + unrecorded_blocks, + ) .unwrap(); // then @@ -582,7 +634,7 @@ fn update_l2_block_data__adds_l2_block_to_unrecorded_blocks() { block_bytes, }; let expected = block_bytes.block_bytes; - let actual = updater.unrecorded_blocks.get(&block_bytes.height).unwrap(); + let actual = unrecorded_blocks.get(&block_bytes.height).unwrap(); assert_eq!(expected, *actual); } @@ -592,14 +644,13 @@ fn update_l2_block_data__retains_existing_blocks_and_adds_l2_block_to_unrecorded // given let starting_block = 0; let first_block_bytes = 1200; - let preexisting_block = BlockBytes { - height: 0, - block_bytes: first_block_bytes, - }; + let mut unrecorded_blocks: BTreeMap<_, _> = vec![(starting_block, first_block_bytes)] + .into_iter() + .collect(); let mut updater = UpdaterBuilder::new() .with_l2_block_height(starting_block) - .with_unrecorded_blocks(vec![preexisting_block.clone()]) + .with_unrecorded_blocks(&unrecorded_blocks) .build(); let height = 1; @@ -610,7 +661,14 @@ fn update_l2_block_data__retains_existing_blocks_and_adds_l2_block_to_unrecorded // when updater - .update_l2_block_data(height, used, capacity, new_block_bytes, new_gas_price) + .update_l2_block_data( + height, + used, + capacity, + new_block_bytes, + new_gas_price, + &mut unrecorded_blocks, + ) .unwrap(); // then @@ -618,14 +676,12 @@ fn update_l2_block_data__retains_existing_blocks_and_adds_l2_block_to_unrecorded height, block_bytes: new_block_bytes, }; - let contains_block_bytes = - updater.unrecorded_blocks.contains_key(&block_bytes.height); + let contains_block_bytes = unrecorded_blocks.contains_key(&block_bytes.height); assert!(contains_block_bytes); // and - let contains_preexisting_block_bytes = updater - .unrecorded_blocks - .contains_key(&preexisting_block.height); + let contains_preexisting_block_bytes = + unrecorded_blocks.contains_key(&starting_block); assert!(contains_preexisting_block_bytes); // and @@ -653,6 +709,7 @@ fn update_l2_block_data__da_gas_price_wants_to_increase_will_hold_if_activity_in .build(); let algo = updater.algorithm(); let old_gas_price = algo.calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let height = updater.l2_block_height + 1; @@ -661,7 +718,7 @@ fn update_l2_block_data__da_gas_price_wants_to_increase_will_hold_if_activity_in let block_bytes = 500u64; let fee = 0; updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -679,6 +736,7 @@ fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activit .with_activity(capped_activity) .build(); let old_gas_price = updater.algorithm().calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let block_bytes = 500u64; @@ -689,6 +747,7 @@ fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activit 100.try_into().unwrap(), block_bytes, 200, + unrecorded_blocks, ) .unwrap(); @@ -712,6 +771,7 @@ fn update_l2_block_data__da_gas_price_wants_to_increase_will_decrease_if_activit .build(); let algo = updater.algorithm(); let old_gas_price = algo.calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let height = updater.l2_block_height + 1; @@ -720,7 +780,7 @@ fn update_l2_block_data__da_gas_price_wants_to_increase_will_decrease_if_activit let block_bytes = 500u64; let fee = 0; updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -743,6 +803,7 @@ fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activit .with_activity(decrease_activity) .build(); let old_gas_price = updater.algorithm().calculate(); + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when let block_bytes = 500u64; @@ -753,6 +814,7 @@ fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activit 100.try_into().unwrap(), block_bytes, 200, + unrecorded_blocks, ) .unwrap(); @@ -786,10 +848,11 @@ fn update_l2_block_data__above_threshold_increase_activity() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -818,10 +881,11 @@ fn update_l2_block_data__below_threshold_decrease_activity() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -859,10 +923,11 @@ fn update_l2_block_data__if_activity_at_max_will_stop_increasing() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then @@ -891,10 +956,11 @@ fn update_l2_block_data__if_activity_is_zero_will_stop_decreasing() { let capacity = 100.try_into().unwrap(); let block_bytes = 1000; let fee = 200; + let unrecorded_blocks = &mut empty_unrecorded_blocks(); // when updater - .update_l2_block_data(height, used, capacity, block_bytes, fee) + .update_l2_block_data(height, used, capacity, block_bytes, fee, unrecorded_blocks) .unwrap(); // then diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs index 4d452421ba1..74bec1207d2 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs @@ -1,40 +1,49 @@ -use crate::common::utils::{ - BlockInfo, - Error as GasPriceError, - Result as GasPriceResult, -}; -use anyhow::anyhow; -use fuel_core_types::fuel_types::BlockHeight; - use crate::{ common::{ fuel_core_storage_adapter::storage::{ + BundleIdTable, GasPriceColumn, GasPriceMetadata, }, updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Error as GasPriceError, + Result as GasPriceResult, + }, + }, + ports::{ + GasPriceServiceAtomicStorage, + GetDaBundleId, + GetMetadataStorage, + SetDaBundleId, + SetMetadataStorage, }, - ports::MetadataStorage, }; +use anyhow::anyhow; +use core::cmp::min; use fuel_core_storage::{ codec::{ postcard::Postcard, Encode, }, kv_store::KeyValueInspect, - structured_storage::StructuredStorage, transactional::{ Modifiable, + StorageTransaction, WriteTransaction, }, + Error as StorageError, StorageAsMut, StorageAsRef, + StorageInspect, }; use fuel_core_types::{ blockchain::{ block::Block, header::ConsensusParametersVersion, }, + fuel_merkle::storage::StorageMutate, fuel_tx::{ field::{ MintAmount, @@ -42,18 +51,39 @@ use fuel_core_types::{ }, Transaction, }, + fuel_types::BlockHeight, }; -use std::cmp::min; #[cfg(test)] mod metadata_tests; pub mod storage; -impl MetadataStorage for StructuredStorage +impl SetMetadataStorage for Storage +where + Storage: Send + Sync, + Storage: Modifiable, + for<'a> StorageTransaction<&'a mut Storage>: + StorageMutate, +{ + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + let block_height = metadata.l2_block_height(); + let mut tx = self.write_transaction(); + tx.storage_as_mut::() + .insert(&block_height, metadata) + .and_then(|_| tx.commit()) + .map_err(|err| GasPriceError::CouldNotSetMetadata { + block_height, + source_error: err.into(), + })?; + Ok(()) + } +} + +impl GetMetadataStorage for Storage where - Storage: KeyValueInspect + Modifiable, Storage: Send + Sync, + Storage: StorageInspect, { fn get_metadata( &self, @@ -67,17 +97,57 @@ where })?; Ok(metadata.map(|inner| inner.into_owned())) } +} - fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { - let block_height = metadata.l2_block_height(); - let mut tx = self.write_transaction(); - tx.storage_as_mut::() - .insert(&block_height, metadata) - .and_then(|_| tx.commit()) - .map_err(|err| GasPriceError::CouldNotSetMetadata { - block_height, - source_error: err.into(), - })?; +impl GetDaBundleId for Storage +where + Storage: Send + Sync, + Storage: StorageInspect, +{ + fn get_bundle_id(&self, block_height: &BlockHeight) -> GasPriceResult> { + let bundle_id = self + .storage::() + .get(block_height) + .map_err(|err| GasPriceError::CouldNotFetchDARecord(err.into()))? + .map(|no| *no); + Ok(bundle_id) + } +} + +impl GasPriceServiceAtomicStorage for Storage +where + Storage: 'static, + Storage: GetMetadataStorage + GetDaBundleId, + Storage: KeyValueInspect + Modifiable + Send + Sync, +{ + type Transaction<'a> = StorageTransaction<&'a mut Storage> where Self: 'a; + + fn begin_transaction(&mut self) -> GasPriceResult> { + let tx = self.write_transaction(); + Ok(tx) + } + + fn commit_transaction(transaction: Self::Transaction<'_>) -> GasPriceResult<()> { + transaction + .commit() + .map_err(|err| GasPriceError::CouldNotCommit(err.into()))?; + Ok(()) + } +} + +impl SetDaBundleId for Storage +where + Storage: Send + Sync, + Storage: StorageMutate, +{ + fn set_bundle_id( + &mut self, + block_height: &BlockHeight, + bundle_id: u32, + ) -> GasPriceResult<()> { + self.storage_as_mut::() + .insert(block_height, &bundle_id) + .map_err(|err| GasPriceError::CouldNotFetchDARecord(err.into()))?; Ok(()) } } @@ -87,6 +157,7 @@ pub struct GasPriceSettings { pub gas_price_factor: u64, pub block_gas_limit: u64, } + pub trait GasPriceSettingsProvider: Send + Sync + Clone { fn settings( &self, diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter/storage.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter/storage.rs index 43d6835dcc1..e9f20b411d8 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter/storage.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter/storage.rs @@ -27,6 +27,8 @@ use fuel_core_types::fuel_types::BlockHeight; pub enum GasPriceColumn { Metadata = 0, State = 1, + UnrecordedBlocks = 2, + BundleId = 3, } impl GasPriceColumn { @@ -68,3 +70,46 @@ impl TableWithBlueprint for GasPriceMetadata { GasPriceColumn::State } } + +/// The storage for all the unrecorded blocks from gas price algorithm, used for guessing the cost +/// for future blocks to be recorded on the DA chain +pub struct UnrecordedBlocksTable; + +type BlockSizeInBytes = u64; + +impl Mappable for UnrecordedBlocksTable { + type Key = Self::OwnedKey; + type OwnedKey = BlockHeight; + type Value = Self::OwnedValue; + type OwnedValue = BlockSizeInBytes; +} + +impl TableWithBlueprint for UnrecordedBlocksTable { + type Blueprint = Plain, Postcard>; + type Column = GasPriceColumn; + + fn column() -> Self::Column { + GasPriceColumn::UnrecordedBlocks + } +} + +pub struct BundleIdTable; + +/// The sequence number or bundle id of the posted blocks. +type BundleId = u32; + +impl Mappable for BundleIdTable { + type Key = Self::OwnedKey; + type OwnedKey = BlockHeight; + type Value = Self::OwnedValue; + type OwnedValue = BundleId; +} + +impl TableWithBlueprint for BundleIdTable { + type Blueprint = Plain, Postcard>; + type Column = GasPriceColumn; + + fn column() -> Self::Column { + GasPriceColumn::BundleId + } +} diff --git a/crates/services/gas_price_service/src/common/utils.rs b/crates/services/gas_price_service/src/common/utils.rs index a3813e53e4f..b57fc2608a2 100644 --- a/crates/services/gas_price_service/src/common/utils.rs +++ b/crates/services/gas_price_service/src/common/utils.rs @@ -19,6 +19,8 @@ pub enum Error { CouldNotInitUpdater(anyhow::Error), #[error("Failed to convert metadata to concrete type. THere is no migration path for this metadata version")] CouldNotConvertMetadata, // todo(https://github.com/FuelLabs/fuel-core/issues/2286) + #[error("Failed to commit to storage: {0:?}")] + CouldNotCommit(anyhow::Error), } pub type Result = core::result::Result; diff --git a/crates/services/gas_price_service/src/ports.rs b/crates/services/gas_price_service/src/ports.rs index cb829ab21fb..0ccd77cb01a 100644 --- a/crates/services/gas_price_service/src/ports.rs +++ b/crates/services/gas_price_service/src/ports.rs @@ -1,16 +1,20 @@ +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::{ + blockchain::block::Block, + fuel_tx::Transaction, + fuel_types::BlockHeight, +}; + use crate::{ common::{ updater_metadata::UpdaterMetadata, utils::Result, }, v0::metadata::V0AlgorithmConfig, - v1::metadata::V1AlgorithmConfig, -}; -use fuel_core_storage::Result as StorageResult; -use fuel_core_types::{ - blockchain::block::Block, - fuel_tx::Transaction, - fuel_types::BlockHeight, + v1::{ + metadata::V1AlgorithmConfig, + uninitialized_task::fuel_storage_unrecorded_blocks::AsUnrecordedBlocks, + }, }; pub trait L2Data: Send + Sync { @@ -21,10 +25,41 @@ pub trait L2Data: Send + Sync { ) -> StorageResult>>; } -pub trait MetadataStorage: Send + Sync { +pub trait SetMetadataStorage: Send + Sync { + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> Result<()>; +} + +pub trait GetMetadataStorage: Send + Sync { fn get_metadata(&self, block_height: &BlockHeight) -> Result>; - fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> Result<()>; +} + +pub trait SetDaBundleId: Send + Sync { + fn set_bundle_id(&mut self, block_height: &BlockHeight, bundle_id: u32) + -> Result<()>; +} + +pub trait GetDaBundleId: Send + Sync { + fn get_bundle_id(&self, block_height: &BlockHeight) -> Result>; +} + +pub trait GasPriceServiceAtomicStorage +where + Self: 'static, + Self: Send + Sync, + Self: GetMetadataStorage + GetDaBundleId, +{ + type Transaction<'a>: AsUnrecordedBlocks + + SetMetadataStorage + + GetMetadataStorage + + SetDaBundleId + + GetDaBundleId + where + Self: 'a; + + fn begin_transaction(&mut self) -> Result>; + + fn commit_transaction(transaction: Self::Transaction<'_>) -> Result<()>; } /// Provides the latest block height. diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index 557d8cd6842..c4237b1141e 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -4,7 +4,10 @@ use crate::{ updater_metadata::UpdaterMetadata, utils::BlockInfo, }, - ports::MetadataStorage, + ports::{ + GetMetadataStorage, + SetMetadataStorage, + }, v0::algorithm::SharedV0Algorithm, }; use anyhow::anyhow; @@ -35,7 +38,7 @@ pub struct GasPriceServiceV0 { impl GasPriceServiceV0 where - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, { pub fn new( l2_block_source: L2, @@ -120,7 +123,7 @@ where impl GasPriceServiceV0 where L2: L2BlockSource, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, { async fn process_l2_block_res( &mut self, @@ -138,7 +141,7 @@ where impl RunnableTask for GasPriceServiceV0 where L2: L2BlockSource, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tracing::trace!("Call of `run` function of the gas price service v0"); @@ -178,7 +181,10 @@ mod tests { Result as GasPriceResult, }, }, - ports::MetadataStorage, + ports::{ + GetMetadataStorage, + SetMetadataStorage, + }, v0::{ metadata::V0AlgorithmConfig, service::GasPriceServiceV0, @@ -217,7 +223,14 @@ mod tests { } } - impl MetadataStorage for FakeMetadata { + impl SetMetadataStorage for FakeMetadata { + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + *self.inner.lock().unwrap() = Some(metadata.clone()); + Ok(()) + } + } + + impl GetMetadataStorage for FakeMetadata { fn get_metadata( &self, _: &BlockHeight, @@ -225,11 +238,6 @@ mod tests { let metadata = self.inner.lock().unwrap().clone(); Ok(metadata) } - - fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { - *self.inner.lock().unwrap() = Some(metadata.clone()); - Ok(()) - } } #[tokio::test] diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index 294c5029a4d..d29116e59d8 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -16,8 +16,9 @@ use crate::{ }, ports::{ GasPriceData, + GetMetadataStorage, L2Data, - MetadataStorage, + SetMetadataStorage, }, v0::{ metadata::{ @@ -86,27 +87,23 @@ impl FakeMetadata { } } -impl MetadataStorage for FakeMetadata { - fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { - let metadata = self.inner.lock().unwrap().clone(); - Ok(metadata) - } - +impl SetMetadataStorage for FakeMetadata { fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { *self.inner.lock().unwrap() = Some(metadata.clone()); Ok(()) } } -struct ErroringMetadata; - -impl MetadataStorage for ErroringMetadata { +impl GetMetadataStorage for FakeMetadata { fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { - Err(GasPriceError::CouldNotFetchMetadata { - source_error: anyhow!("boo!"), - }) + let metadata = self.inner.lock().unwrap().clone(); + Ok(metadata) } +} + +struct ErroringMetadata; +impl SetMetadataStorage for ErroringMetadata { fn set_metadata(&mut self, _: &UpdaterMetadata) -> GasPriceResult<()> { Err(GasPriceError::CouldNotSetMetadata { block_height: Default::default(), @@ -115,6 +112,14 @@ impl MetadataStorage for ErroringMetadata { } } +impl GetMetadataStorage for ErroringMetadata { + fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { + Err(GasPriceError::CouldNotFetchMetadata { + source_error: anyhow!("boo!"), + }) + } +} + fn arbitrary_config() -> V0AlgorithmConfig { V0AlgorithmConfig { starting_gas_price: 100, diff --git a/crates/services/gas_price_service/src/v0/uninitialized_task.rs b/crates/services/gas_price_service/src/v0/uninitialized_task.rs index 569739605c7..0dd0e395d68 100644 --- a/crates/services/gas_price_service/src/v0/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v0/uninitialized_task.rs @@ -18,7 +18,7 @@ use crate::{ GasPriceData, GasPriceServiceConfig, L2Data, - MetadataStorage, + SetMetadataStorage, }, v0::{ algorithm::SharedV0Algorithm, @@ -45,6 +45,7 @@ use fuel_core_types::{ }; use fuel_gas_price_algorithm::v0::AlgorithmUpdaterV0; +use crate::ports::GetMetadataStorage; pub use fuel_gas_price_algorithm::v0::AlgorithmV0; pub struct UninitializedTask { @@ -65,7 +66,7 @@ where L2DataStore: L2Data, L2DataStoreView: AtomicView, GasPriceStore: GasPriceData, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, SettingsProvider: GasPriceSettingsProvider, { pub fn new( @@ -166,7 +167,7 @@ where L2DataStore: L2Data, L2DataStoreView: AtomicView, GasPriceStore: GasPriceData, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, SettingsProvider: GasPriceSettingsProvider, { const NAME: &'static str = "GasPriceServiceV0"; @@ -193,7 +194,7 @@ pub fn initialize_algorithm( metadata_storage: &Metadata, ) -> GasPriceResult<(AlgorithmUpdaterV0, SharedV0Algorithm)> where - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, { let min_exec_gas_price = config.min_gas_price; let exec_gas_price_change_percent = config.gas_price_change_percent; @@ -243,7 +244,7 @@ fn sync_gas_price_db_with_on_chain_storage< where L2DataStore: L2Data, L2DataStoreView: AtomicView, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, SettingsProvider: GasPriceSettingsProvider, { let metadata = metadata_storage @@ -287,7 +288,7 @@ fn sync_v0_metadata( where L2DataStore: L2Data, L2DataStoreView: AtomicView, - Metadata: MetadataStorage, + Metadata: SetMetadataStorage, SettingsProvider: GasPriceSettingsProvider, { let first = metadata_height.saturating_add(1); @@ -344,7 +345,7 @@ where L2DataStoreView: AtomicView, GasPriceStore: GasPriceData, SettingsProvider: GasPriceSettingsProvider, - Metadata: MetadataStorage, + Metadata: GetMetadataStorage + SetMetadataStorage, { let v0_config = config.v0().ok_or(anyhow::anyhow!("Expected V0 config"))?; let gas_price_init = UninitializedTask::new( diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index ff2c703b518..bae8d3d46bc 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -2,13 +2,15 @@ use crate::v1::da_source_service::service::DaBlockCostsSource; use std::time::Duration; pub mod block_committer_costs; +#[cfg(test)] pub mod dummy_costs; pub mod service; #[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] pub struct DaBlockCosts { + pub bundle_id: u32, pub l2_blocks: Vec, - pub blob_size_bytes: u32, + pub bundle_size_bytes: u32, pub blob_cost_wei: u128, } @@ -30,8 +32,9 @@ mod tests { async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() { // given let expected_da_cost = DaBlockCosts { + bundle_id: 1, l2_blocks: (0..10).collect(), - blob_size_bytes: 1024 * 128, + bundle_size_bytes: 1024 * 128, blob_cost_wei: 2, }; let notifier = Arc::new(tokio::sync::Notify::new()); diff --git a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs index 59498849996..98c024933c8 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs @@ -40,7 +40,7 @@ pub struct BlockCommitterDaBlockCosts { #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] pub struct RawDaBlockCosts { /// Sequence number (Monotonically increasing nonce) - pub sequence_number: u32, + pub bundle_id: u32, /// The range of blocks that the costs apply to pub blocks_heights: Vec, /// The DA block height of the last transaction for the range of blocks @@ -54,12 +54,13 @@ pub struct RawDaBlockCosts { impl From<&RawDaBlockCosts> for DaBlockCosts { fn from(raw_da_block_costs: &RawDaBlockCosts) -> Self { DaBlockCosts { + bundle_id: raw_da_block_costs.bundle_id, l2_blocks: raw_da_block_costs .blocks_heights .clone() .into_iter() .collect(), - blob_size_bytes: raw_da_block_costs.total_size_bytes, + bundle_size_bytes: raw_da_block_costs.total_size_bytes, blob_cost_wei: raw_da_block_costs.total_cost, } } @@ -82,9 +83,9 @@ where { async fn request_da_block_cost(&mut self) -> DaBlockCostsResult { let raw_da_block_costs = match self.last_raw_da_block_costs { - Some(ref last_value) => self - .client - .get_costs_by_seqno(last_value.sequence_number + 1), + Some(ref last_value) => { + self.client.get_costs_by_seqno(last_value.bundle_id + 1) + } _ => self.client.get_latest_costs(), } .await?; @@ -98,7 +99,7 @@ where |costs: DaBlockCostsResult, last_value| { let costs = costs.expect("Defined to be OK"); let blob_size_bytes = costs - .blob_size_bytes + .bundle_size_bytes .checked_sub(last_value.total_size_bytes) .ok_or(anyhow!("Blob size bytes underflow"))?; let blob_cost_wei = raw_da_block_costs @@ -106,7 +107,7 @@ where .checked_sub(last_value.total_cost) .ok_or(anyhow!("Blob cost wei underflow"))?; Ok(DaBlockCosts { - blob_size_bytes, + bundle_size_bytes: blob_size_bytes, blob_cost_wei, ..costs }) @@ -116,6 +117,10 @@ where self.last_raw_da_block_costs = Some(raw_da_block_costs.clone()); Ok(da_block_costs) } + async fn set_last_value(&mut self, bundle_id: u32) -> DaBlockCostsResult<()> { + self.last_raw_da_block_costs = self.client.get_costs_by_seqno(bundle_id).await?; + Ok(()) + } } pub struct BlockCommitterHttpApi { @@ -201,7 +206,7 @@ mod tests { // arbitrary logic to generate a new value let mut value = self.value.clone(); if let Some(value) = &mut value { - value.sequence_number = seq_no; + value.bundle_id = seq_no; value.blocks_heights = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] .to_vec() .iter() @@ -224,7 +229,7 @@ mod tests { fn test_da_block_costs() -> RawDaBlockCosts { RawDaBlockCosts { - sequence_number: 1, + bundle_id: 1, blocks_heights: (0..10).collect(), da_block_height: 1u64.into(), total_cost: 1, @@ -299,7 +304,7 @@ mod tests { // arbitrary logic to generate a new value let mut value = self.value.clone(); if let Some(value) = &mut value { - value.sequence_number = seq_no; + value.bundle_id = seq_no; value.blocks_heights = value.blocks_heights.iter().map(|x| x + seq_no).collect(); value.da_block_height = value.da_block_height + 1u64.into(); diff --git a/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs index 8fe22a89e0b..5204ea5fba0 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs @@ -34,4 +34,8 @@ impl DaBlockCostsSource for DummyDaBlockCosts { } } } + + async fn set_last_value(&mut self, _bundle_id: u32) -> DaBlockCostsResult<()> { + unimplemented!("This is a dummy implementation"); + } } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 328f73e6a20..d7dfca30a20 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -32,10 +32,7 @@ impl SharedState { /// This struct houses the shared_state, polling interval /// and a source, which does the actual fetching of the data -pub struct DaSourceService -where - Source: DaBlockCostsSource, -{ +pub struct DaSourceService { poll_interval: Interval, source: Source, shared_state: SharedState, @@ -72,6 +69,7 @@ where #[async_trait::async_trait] pub trait DaBlockCostsSource: Send + Sync { async fn request_da_block_cost(&mut self) -> Result; + async fn set_last_value(&mut self, bundle_id: u32) -> Result<()>; } #[async_trait::async_trait] diff --git a/crates/services/gas_price_service/src/v1/metadata.rs b/crates/services/gas_price_service/src/v1/metadata.rs index 54000623bac..ef2e88466f5 100644 --- a/crates/services/gas_price_service/src/v1/metadata.rs +++ b/crates/services/gas_price_service/src/v1/metadata.rs @@ -3,10 +3,7 @@ use fuel_gas_price_algorithm::v1::{ AlgorithmUpdaterV1, L2ActivityTracker, }; -use std::{ - collections::BTreeMap, - num::NonZeroU64, -}; +use std::num::NonZeroU64; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub struct V1Metadata { @@ -30,9 +27,9 @@ pub struct V1Metadata { pub second_to_last_profit: i128, /// The latest known cost per byte for recording blocks on the DA chain pub latest_da_cost_per_byte: u128, - /// List of (height, size) for l2 blocks that have not been recorded on the DA chain (that we know), + /// Track the total bytes of all l2 blocks that have not been recorded on the DA chain (that we know), /// but have been used to estimate the cost of recording blocks on the DA chain - pub unrecorded_blocks: Vec<(u32, u64)>, + pub unrecorded_block_bytes: u128, } impl V1Metadata { @@ -54,17 +51,20 @@ impl V1Metadata { last_profit: 0, second_to_last_profit: 0, latest_da_cost_per_byte: 0, - unrecorded_blocks: vec![], + unrecorded_block_bytes: 0, }; Ok(metadata) } } +#[derive(Debug, Clone, PartialEq)] pub struct V1AlgorithmConfig { pub new_exec_gas_price: u64, pub min_exec_gas_price: u64, pub exec_gas_price_change_percent: u16, pub l2_block_fullness_threshold_percent: u8, + // TODO:We don't need this after we implement + // https://github.com/FuelLabs/fuel-core/issues/2481 pub gas_price_factor: NonZeroU64, pub min_da_gas_price: u64, pub max_da_gas_price_change_percent: u16, @@ -74,51 +74,40 @@ pub struct V1AlgorithmConfig { pub capped_range_size: u16, pub decrease_range_size: u16, pub block_activity_threshold: u8, - pub unrecorded_blocks: Vec<(u32, u64)>, } -impl From<&V1AlgorithmConfig> for AlgorithmUpdaterV1 { - fn from(value: &V1AlgorithmConfig) -> Self { - let l2_activity = L2ActivityTracker::new_full( - value.normal_range_size, - value.capped_range_size, - value.decrease_range_size, - value.block_activity_threshold.into(), - ); - let unrecorded_blocks: BTreeMap<_, _> = - value.unrecorded_blocks.clone().into_iter().collect(); - let unrecorded_blocks_bytes: u128 = unrecorded_blocks - .values() - .map(|size| u128::from(*size)) - .sum(); - Self { - // TODO:We don't need this after we implement - // https://github.com/FuelLabs/fuel-core/issues/2481 - new_scaled_exec_price: value - .new_exec_gas_price - .saturating_mul(value.gas_price_factor.get()), - l2_block_height: 0, - new_scaled_da_gas_price: value.min_da_gas_price, - gas_price_factor: value.gas_price_factor, - total_da_rewards_excess: 0, - latest_known_total_da_cost_excess: 0, - projected_total_da_cost: 0, - last_profit: 0, - second_to_last_profit: 0, - latest_da_cost_per_byte: 0, - l2_activity, - min_exec_gas_price: value.min_exec_gas_price, - exec_gas_price_change_percent: value.exec_gas_price_change_percent, - l2_block_fullness_threshold_percent: value - .l2_block_fullness_threshold_percent - .into(), - min_da_gas_price: value.min_da_gas_price, - max_da_gas_price_change_percent: value.max_da_gas_price_change_percent, - da_p_component: value.da_p_component, - da_d_component: value.da_d_component, - unrecorded_blocks, - unrecorded_blocks_bytes, - } +pub fn updater_from_config(value: &V1AlgorithmConfig) -> AlgorithmUpdaterV1 { + let l2_activity = L2ActivityTracker::new_full( + value.normal_range_size, + value.capped_range_size, + value.decrease_range_size, + value.block_activity_threshold.into(), + ); + let unrecorded_blocks_bytes = 0; + AlgorithmUpdaterV1 { + new_scaled_exec_price: value + .new_exec_gas_price + .saturating_mul(value.gas_price_factor.get()), + l2_block_height: 0, + new_scaled_da_gas_price: value.min_da_gas_price, + gas_price_factor: value.gas_price_factor, + total_da_rewards_excess: 0, + latest_known_total_da_cost_excess: 0, + projected_total_da_cost: 0, + last_profit: 0, + second_to_last_profit: 0, + latest_da_cost_per_byte: 0, + l2_activity, + min_exec_gas_price: value.min_exec_gas_price, + exec_gas_price_change_percent: value.exec_gas_price_change_percent, + l2_block_fullness_threshold_percent: value + .l2_block_fullness_threshold_percent + .into(), + min_da_gas_price: value.min_da_gas_price, + max_da_gas_price_change_percent: value.max_da_gas_price_change_percent, + da_p_component: value.da_p_component, + da_d_component: value.da_d_component, + unrecorded_blocks_bytes, } } @@ -134,7 +123,7 @@ impl From for V1Metadata { last_profit: updater.last_profit, second_to_last_profit: updater.second_to_last_profit, latest_da_cost_per_byte: updater.latest_da_cost_per_byte, - unrecorded_blocks: updater.unrecorded_blocks.into_iter().collect(), + unrecorded_block_bytes: updater.unrecorded_blocks_bytes, } } } @@ -149,17 +138,12 @@ pub fn v1_algorithm_from_metadata( config.decrease_range_size, config.block_activity_threshold.into(), ); - let unrecorded_blocks_bytes: u128 = metadata - .unrecorded_blocks - .iter() - .map(|(_, size)| u128::from(*size)) - .sum(); + let unrecorded_blocks_bytes: u128 = metadata.unrecorded_block_bytes; let projected_portion = unrecorded_blocks_bytes.saturating_mul(metadata.latest_da_cost_per_byte); let projected_total_da_cost = metadata .latest_known_total_da_cost_excess .saturating_add(projected_portion); - let unrecorded_blocks = metadata.unrecorded_blocks.into_iter().collect(); AlgorithmUpdaterV1 { new_scaled_exec_price: metadata.new_scaled_exec_price, l2_block_height: metadata.l2_block_height, @@ -181,7 +165,6 @@ pub fn v1_algorithm_from_metadata( max_da_gas_price_change_percent: config.max_da_gas_price_change_percent, da_p_component: config.da_p_component, da_d_component: config.da_d_component, - unrecorded_blocks, unrecorded_blocks_bytes, } } diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index f95649144a3..df0c91a5c93 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -1,11 +1,22 @@ +use std::num::NonZeroU64; + use crate::{ common::{ gas_price_algorithm::SharedGasPriceAlgo, l2_block_source::L2BlockSource, updater_metadata::UpdaterMetadata, - utils::BlockInfo, + utils::{ + BlockInfo, + Result as GasPriceResult, + }, + }, + ports::{ + GasPriceServiceAtomicStorage, + GetDaBundleId, + GetMetadataStorage, + SetDaBundleId, + SetMetadataStorage, }, - ports::MetadataStorage, v0::metadata::V0Metadata, v1::{ algorithm::SharedV1Algorithm, @@ -18,10 +29,15 @@ use crate::{ DaBlockCosts, }, metadata::{ + updater_from_config, v1_algorithm_from_metadata, V1AlgorithmConfig, V1Metadata, }, + uninitialized_task::fuel_storage_unrecorded_blocks::{ + AsUnrecordedBlocks, + FuelStorageUnrecordedBlocks, + }, }, }; use anyhow::anyhow; @@ -32,48 +48,43 @@ use fuel_core_services::{ StateWatcher, TaskNextAction, }; +use fuel_core_types::fuel_types::BlockHeight; use fuel_gas_price_algorithm::{ v0::AlgorithmUpdaterV0, v1::{ AlgorithmUpdaterV1, AlgorithmV1, + UnrecordedBlocks, }, }; use futures::FutureExt; -use std::num::NonZeroU64; -use tokio::sync::broadcast::{ - error::RecvError, - Receiver, -}; - -use crate::common::utils::Result as GasPriceResult; +use tokio::sync::broadcast::Receiver; /// The service that updates the gas price algorithm. -pub struct GasPriceServiceV1 -where - DA: DaBlockCostsSource, -{ +pub struct GasPriceServiceV1 { /// The algorithm that can be used in the next block shared_algo: SharedV1Algorithm, /// The L2 block source l2_block_source: L2, - /// The metadata storage - metadata_storage: Metadata, /// The algorithm updater algorithm_updater: AlgorithmUpdaterV1, /// the da source adapter handle da_source_adapter_handle: DaSourceService, /// The da source channel da_source_channel: Receiver, + /// Buffer of block costs from the DA chain + da_block_costs_buffer: Vec, + /// Storage transaction provider for metadata and unrecorded blocks + storage_tx_provider: StorageTxProvider, } -impl GasPriceServiceV1 +impl GasPriceServiceV1 where L2: L2BlockSource, - Metadata: MetadataStorage, DA: DaBlockCostsSource, + AtomicStorage: GasPriceServiceAtomicStorage, { - async fn process_l2_block_res( + async fn commit_block_data_to_algorithm( &mut self, l2_block_res: GasPriceResult, ) -> anyhow::Result<()> { @@ -84,42 +95,30 @@ where self.apply_block_info_to_gas_algorithm(block).await?; Ok(()) } - - async fn process_da_block_costs_res( - &mut self, - da_block_costs: Result, - ) -> anyhow::Result<()> { - tracing::info!("Received DA block costs: {:?}", da_block_costs); - let da_block_costs = da_block_costs?; - - tracing::debug!("Updating DA block costs"); - self.apply_da_block_costs_to_gas_algorithm(da_block_costs) - .await?; - Ok(()) - } } -impl GasPriceServiceV1 +impl GasPriceServiceV1 where - Metadata: MetadataStorage, DA: DaBlockCostsSource, + AtomicStorage: GasPriceServiceAtomicStorage, { pub fn new( l2_block_source: L2, - metadata_storage: Metadata, shared_algo: SharedV1Algorithm, algorithm_updater: AlgorithmUpdaterV1, da_source_adapter_handle: DaSourceService, + storage_tx_provider: AtomicStorage, ) -> Self { let da_source_channel = da_source_adapter_handle.shared_data().clone().subscribe(); Self { shared_algo, l2_block_source, - metadata_storage, algorithm_updater, da_source_adapter_handle, da_source_channel, + da_block_costs_buffer: Vec::new(), + storage_tx_provider, } } @@ -131,25 +130,22 @@ where self.shared_algo.clone() } + #[cfg(test)] + pub fn storage_tx_provider(&self) -> &AtomicStorage { + &self.storage_tx_provider + } + async fn update(&mut self, new_algorithm: AlgorithmV1) { self.shared_algo.update(new_algorithm).await; } fn validate_block_gas_capacity( - &self, block_gas_capacity: u64, ) -> anyhow::Result { NonZeroU64::new(block_gas_capacity) .ok_or_else(|| anyhow!("Block gas capacity must be non-zero")) } - async fn set_metadata(&mut self) -> anyhow::Result<()> { - let metadata: UpdaterMetadata = self.algorithm_updater.clone().into(); - self.metadata_storage - .set_metadata(&metadata) - .map_err(|err| anyhow!(err)) - } - async fn handle_normal_block( &mut self, height: u32, @@ -158,7 +154,29 @@ where block_bytes: u64, block_fees: u64, ) -> anyhow::Result<()> { - let capacity = self.validate_block_gas_capacity(block_gas_capacity)?; + let capacity = Self::validate_block_gas_capacity(block_gas_capacity)?; + let mut storage_tx = self.storage_tx_provider.begin_transaction()?; + let prev_height = height.saturating_sub(1); + let mut bundle_id = storage_tx + .get_bundle_id(&BlockHeight::from(prev_height)) + .map_err(|err| anyhow!(err))?; + + for da_block_costs in &self.da_block_costs_buffer { + tracing::debug!("Updating DA block costs: {:?}", da_block_costs); + self.algorithm_updater.update_da_record_data( + &da_block_costs.l2_blocks, + da_block_costs.bundle_size_bytes, + da_block_costs.blob_cost_wei, + &mut storage_tx.as_unrecorded_blocks(), + )?; + bundle_id = Some(da_block_costs.bundle_id); + } + + if let Some(bundle_id) = bundle_id { + storage_tx + .set_bundle_id(&BlockHeight::from(height), bundle_id) + .map_err(|err| anyhow!(err))?; + } self.algorithm_updater.update_l2_block_data( height, @@ -166,23 +184,18 @@ where capacity, block_bytes, block_fees as u128, + &mut storage_tx.as_unrecorded_blocks(), )?; - self.set_metadata().await?; - Ok(()) - } - - async fn handle_da_block_costs( - &mut self, - da_block_costs: DaBlockCosts, - ) -> anyhow::Result<()> { - self.algorithm_updater.update_da_record_data( - &da_block_costs.l2_blocks, - da_block_costs.blob_size_bytes, - da_block_costs.blob_cost_wei, - )?; - - self.set_metadata().await?; + let metadata = self.algorithm_updater.clone().into(); + storage_tx + .set_metadata(&metadata) + .map_err(|err| anyhow!(err))?; + AtomicStorage::commit_transaction(storage_tx)?; + let new_algo = self.algorithm_updater.algorithm(); + self.shared_algo.update(new_algo).await; + // Clear the buffer after committing changes + self.da_block_costs_buffer.clear(); Ok(()) } @@ -192,7 +205,12 @@ where ) -> anyhow::Result<()> { match l2_block { BlockInfo::GenesisBlock => { - self.set_metadata().await?; + let metadata: UpdaterMetadata = self.algorithm_updater.clone().into(); + let mut tx = self.storage_tx_provider.begin_transaction()?; + tx.set_metadata(&metadata).map_err(|err| anyhow!(err))?; + AtomicStorage::commit_transaction(tx)?; + let new_algo = self.algorithm_updater.algorithm(); + self.shared_algo.update(new_algo).await; } BlockInfo::Block { height, @@ -212,26 +230,16 @@ where } } - self.update(self.algorithm_updater.algorithm()).await; - Ok(()) - } - - async fn apply_da_block_costs_to_gas_algorithm( - &mut self, - da_block_costs: DaBlockCosts, - ) -> anyhow::Result<()> { - self.handle_da_block_costs(da_block_costs).await?; - self.update(self.algorithm_updater.algorithm()).await; Ok(()) } } #[async_trait] -impl RunnableTask for GasPriceServiceV1 +impl RunnableTask for GasPriceServiceV1 where L2: L2BlockSource, - Metadata: MetadataStorage, DA: DaBlockCostsSource, + AtomicStorage: GasPriceServiceAtomicStorage, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { @@ -241,12 +249,22 @@ where TaskNextAction::Stop } l2_block_res = self.l2_block_source.get_l2_block() => { - let res = self.process_l2_block_res(l2_block_res).await; + tracing::debug!("Received L2 block result: {:?}", l2_block_res); + let res = self.commit_block_data_to_algorithm(l2_block_res).await; TaskNextAction::always_continue(res) } - da_block_costs = self.da_source_channel.recv() => { - let res = self.process_da_block_costs_res(da_block_costs).await; - TaskNextAction::always_continue(res) + da_block_costs_res = self.da_source_channel.recv() => { + tracing::debug!("Received DA block costs: {:?}", da_block_costs_res); + match da_block_costs_res { + Ok(da_block_costs) => { + self.da_block_costs_buffer.push(da_block_costs); + TaskNextAction::Continue + }, + Err(err) => { + let err = anyhow!("Error receiving DA block costs: {:?}", err); + TaskNextAction::ErrorContinue(err) + } + } } } } @@ -258,12 +276,6 @@ where self.apply_block_info_to_gas_algorithm(block).await?; } - while let Ok(da_block_costs) = self.da_source_channel.try_recv() { - tracing::debug!("Updating DA block costs"); - self.apply_da_block_costs_to_gas_algorithm(da_block_costs) - .await?; - } - // run shutdown hooks for internal services self.da_source_adapter_handle.shutdown().await?; @@ -295,20 +307,18 @@ pub fn initialize_algorithm( metadata_storage: &Metadata, ) -> crate::common::utils::Result<(AlgorithmUpdaterV1, SharedV1Algorithm)> where - Metadata: MetadataStorage, + Metadata: GetMetadataStorage, { - let algorithm_updater; - if let Some(updater_metadata) = metadata_storage + let algorithm_updater = if let Some(updater_metadata) = metadata_storage .get_metadata(&latest_block_height.into()) .map_err(|err| { crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!(err)) - })? - { + })? { let v1_metadata = convert_to_v1_metadata(updater_metadata, config)?; - algorithm_updater = v1_algorithm_from_metadata(v1_metadata, config); + v1_algorithm_from_metadata(v1_metadata, config) } else { - algorithm_updater = AlgorithmUpdaterV1::from(config); - } + updater_from_config(config) + }; let shared_algo = SharedGasPriceAlgo::new_with_algorithm(algorithm_updater.algorithm()); @@ -320,8 +330,38 @@ where #[allow(non_snake_case)] #[cfg(test)] mod tests { + use std::{ + num::NonZeroU64, + sync::Arc, + time::Duration, + }; + + use tokio::sync::mpsc; + + use fuel_core_services::{ + RunnableTask, + StateWatcher, + }; + use fuel_core_storage::{ + structured_storage::test::InMemoryStorage, + transactional::{ + IntoTransaction, + StorageTransaction, + WriteTransaction, + }, + StorageAsMut, + }; + use fuel_core_types::fuel_types::BlockHeight; + use crate::{ common::{ + fuel_core_storage_adapter::storage::{ + BundleIdTable, + GasPriceColumn, + GasPriceColumn::UnrecordedBlocks, + UnrecordedBlocksTable, + }, + gas_price_algorithm::SharedGasPriceAlgo, l2_block_source::L2BlockSource, updater_metadata::UpdaterMetadata, utils::{ @@ -329,31 +369,27 @@ mod tests { Result as GasPriceResult, }, }, - ports::MetadataStorage, + ports::{ + GetMetadataStorage, + SetMetadataStorage, + }, v1::{ da_source_service::{ dummy_costs::DummyDaBlockCosts, service::DaSourceService, DaBlockCosts, }, - metadata::V1AlgorithmConfig, + metadata::{ + updater_from_config, + V1AlgorithmConfig, + }, service::{ initialize_algorithm, GasPriceServiceV1, }, + uninitialized_task::fuel_storage_unrecorded_blocks::FuelStorageUnrecordedBlocks, }, }; - use fuel_core_services::{ - RunnableTask, - StateWatcher, - }; - use fuel_core_types::fuel_types::BlockHeight; - use std::{ - num::NonZeroU64, - sync::Arc, - time::Duration, - }; - use tokio::sync::mpsc; struct FakeL2BlockSource { l2_block: mpsc::Receiver, @@ -379,7 +415,14 @@ mod tests { } } - impl MetadataStorage for FakeMetadata { + impl SetMetadataStorage for FakeMetadata { + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + *self.inner.lock().unwrap() = Some(metadata.clone()); + Ok(()) + } + } + + impl GetMetadataStorage for FakeMetadata { fn get_metadata( &self, _: &BlockHeight, @@ -387,11 +430,9 @@ mod tests { let metadata = self.inner.lock().unwrap().clone(); Ok(metadata) } - - fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { - *self.inner.lock().unwrap() = Some(metadata.clone()); - Ok(()) - } + } + fn database() -> StorageTransaction> { + InMemoryStorage::default().into_transaction() } #[tokio::test] @@ -427,8 +468,8 @@ mod tests { capped_range_size: 100, decrease_range_size: 4, block_activity_threshold: 20, - unrecorded_blocks: vec![], }; + let inner = database(); let (algo_updater, shared_algo) = initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); @@ -443,10 +484,10 @@ mod tests { let mut service = GasPriceServiceV1::new( l2_block_source, - metadata_storage, shared_algo, algo_updater, dummy_da_source, + inner, ); let read_algo = service.next_block_algorithm(); let mut watcher = StateWatcher::default(); @@ -465,8 +506,8 @@ mod tests { #[tokio::test] async fn run__updates_gas_price_with_da_block_cost_source() { // given - let block_height = 1; - let l2_block = BlockInfo::Block { + let block_height = 2; + let l2_block_2 = BlockInfo::Block { height: block_height, gas_used: 60, block_gas_capacity: 100, @@ -480,45 +521,56 @@ mod tests { }; let metadata_storage = FakeMetadata::empty(); + // Configured so exec gas price doesn't change, only da gas price let config = V1AlgorithmConfig { new_exec_gas_price: 100, min_exec_gas_price: 50, - exec_gas_price_change_percent: 20, + exec_gas_price_change_percent: 0, l2_block_fullness_threshold_percent: 20, gas_price_factor: NonZeroU64::new(10).unwrap(), - min_da_gas_price: 100, - max_da_gas_price_change_percent: 50, + min_da_gas_price: 0, + max_da_gas_price_change_percent: 100, da_p_component: 4, da_d_component: 2, normal_range_size: 10, capped_range_size: 100, decrease_range_size: 4, block_activity_threshold: 20, - unrecorded_blocks: vec![(1, 100)], }; - let (algo_updater, shared_algo) = - initialize_algorithm(&config, block_height, &metadata_storage).unwrap(); + let mut inner = database(); + let mut tx = inner.write_transaction(); + tx.storage_as_mut::() + .insert(&BlockHeight::from(1), &100) + .unwrap(); + tx.commit().unwrap(); + let mut algo_updater = updater_from_config(&config); + let shared_algo = + SharedGasPriceAlgo::new_with_algorithm(algo_updater.algorithm()); + algo_updater.l2_block_height = block_height - 1; + algo_updater.last_profit = 10_000; + algo_updater.new_scaled_da_gas_price = 10_000_000; let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( Ok(DaBlockCosts { + bundle_id: 1, l2_blocks: (1..2).collect(), blob_cost_wei: 9000, - blob_size_bytes: 3000, + bundle_size_bytes: 3000, }), notifier.clone(), ), Some(Duration::from_millis(1)), ); - let mut watcher = StateWatcher::default(); + let mut watcher = StateWatcher::started(); let mut service = GasPriceServiceV1::new( l2_block_source, - metadata_storage, shared_algo, algo_updater, da_source, + inner, ); let read_algo = service.next_block_algorithm(); let initial_price = read_algo.next_gas_price(); @@ -532,6 +584,10 @@ mod tests { .run(&mut da_source_watcher) .await; + service.run(&mut watcher).await; + tokio::time::sleep(Duration::from_millis(100)).await; + l2_block_sender.send(l2_block_2).await.unwrap(); + // when service.run(&mut watcher).await; tokio::time::sleep(Duration::from_millis(100)).await; @@ -541,4 +597,110 @@ mod tests { let actual_price = read_algo.next_gas_price(); assert_ne!(initial_price, actual_price); } + + fn arbitrary_v1_algorithm_config() -> V1AlgorithmConfig { + V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 50, + exec_gas_price_change_percent: 0, + l2_block_fullness_threshold_percent: 20, + gas_price_factor: NonZeroU64::new(10).unwrap(), + min_da_gas_price: 0, + max_da_gas_price_change_percent: 100, + da_p_component: 4, + da_d_component: 2, + normal_range_size: 10, + capped_range_size: 100, + decrease_range_size: 4, + block_activity_threshold: 20, + } + } + + #[tokio::test] + async fn run__responses_from_da_service_update_bundle_id_in_storage() { + // given + let bundle_id = 1234; + let block_height = 2; + let l2_block_2 = BlockInfo::Block { + height: block_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let metadata_storage = FakeMetadata::empty(); + // Configured so exec gas price doesn't change, only da gas price + let config = arbitrary_v1_algorithm_config(); + let mut inner = database(); + let mut tx = inner.write_transaction(); + tx.storage_as_mut::() + .insert(&BlockHeight::from(1), &100) + .unwrap(); + tx.commit().unwrap(); + let mut algo_updater = updater_from_config(&config); + let shared_algo = + SharedGasPriceAlgo::new_with_algorithm(algo_updater.algorithm()); + algo_updater.l2_block_height = block_height - 1; + algo_updater.last_profit = 10_000; + algo_updater.new_scaled_da_gas_price = 10_000_000; + + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_source = DaSourceService::new( + DummyDaBlockCosts::new( + Ok(DaBlockCosts { + bundle_id, + l2_blocks: (1..2).collect(), + blob_cost_wei: 9000, + bundle_size_bytes: 3000, + }), + notifier.clone(), + ), + Some(Duration::from_millis(1)), + ); + let mut watcher = StateWatcher::started(); + + let mut service = GasPriceServiceV1::new( + l2_block_source, + shared_algo, + algo_updater, + da_source, + inner, + ); + let read_algo = service.next_block_algorithm(); + let initial_price = read_algo.next_gas_price(); + + // the RunnableTask depends on the handle passed to it for the da block cost source to already be running, + // which is the responsibility of the UninitializedTask in the `into_task` method of the RunnableService + // here we mimic that behaviour by running the da block cost service. + let mut da_source_watcher = StateWatcher::started(); + service + .da_source_adapter_handle + .run(&mut da_source_watcher) + .await; + + service.run(&mut watcher).await; + tokio::time::sleep(Duration::from_millis(100)).await; + l2_block_sender.send(l2_block_2).await.unwrap(); + + // when + service.run(&mut watcher).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + // then + let latest_bundle_id = service + .storage_tx_provider + .storage::() + .get(&BlockHeight::from(block_height)) + .unwrap() + .unwrap(); + assert_eq!(*latest_bundle_id, bundle_id); + + service.shutdown().await.unwrap(); + } } diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs index e5188a9c869..f5524508edc 100644 --- a/crates/services/gas_price_service/src/v1/tests.rs +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -2,6 +2,10 @@ use crate::{ common::{ fuel_core_storage_adapter::{ + storage::{ + GasPriceColumn, + GasPriceMetadata, + }, GasPriceSettings, GasPriceSettingsProvider, }, @@ -15,10 +19,15 @@ use crate::{ }, ports::{ GasPriceData, + GasPriceServiceAtomicStorage, + GetDaBundleId, + GetMetadataStorage, L2Data, - MetadataStorage, + SetDaBundleId, + SetMetadataStorage, }, v1::{ + algorithm::SharedV1Algorithm, da_source_service::{ service::{ DaBlockCostsSource, @@ -27,6 +36,7 @@ use crate::{ DaBlockCosts, }, metadata::{ + updater_from_config, V1AlgorithmConfig, V1Metadata, }, @@ -34,10 +44,16 @@ use crate::{ initialize_algorithm, GasPriceServiceV1, }, - uninitialized_task::UninitializedTask, + uninitialized_task::{ + fuel_storage_unrecorded_blocks::AsUnrecordedBlocks, + UninitializedTask, + }, }, }; -use anyhow::anyhow; +use anyhow::{ + anyhow, + Result, +}; use fuel_core_services::{ stream::{ BoxStream, @@ -47,14 +63,23 @@ use fuel_core_services::{ StateWatcher, }; use fuel_core_storage::{ - transactional::AtomicView, + iter::IteratorOverTable, + structured_storage::test::InMemoryStorage, + transactional::{ + AtomicView, + IntoTransaction, + StorageTransaction, + WriteTransaction, + }, Result as StorageResult, + StorageAsMut, }; use fuel_core_types::{ blockchain::{ block::Block, header::ConsensusParametersVersion, }, + fuel_asm::op::exp, fuel_tx::Transaction, fuel_types::BlockHeight, services::block_importer::{ @@ -62,11 +87,20 @@ use fuel_core_types::{ SharedImportResult, }, }; -use fuel_gas_price_algorithm::v1::AlgorithmUpdaterV1; +use fuel_gas_price_algorithm::v1::{ + AlgorithmUpdaterV1, + Bytes, + Error, + Height, + UnrecordedBlocks, +}; use std::{ num::NonZeroU64, ops::Deref, - sync::Arc, + sync::{ + Arc, + Mutex, + }, }; use tokio::sync::mpsc::Receiver; @@ -94,27 +128,23 @@ impl FakeMetadata { } } -impl MetadataStorage for FakeMetadata { - fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { - let metadata = self.inner.lock().unwrap().clone(); - Ok(metadata) - } - +impl SetMetadataStorage for FakeMetadata { fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { *self.inner.lock().unwrap() = Some(metadata.clone()); Ok(()) } } -struct ErroringMetadata; - -impl MetadataStorage for ErroringMetadata { +impl GetMetadataStorage for FakeMetadata { fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { - Err(GasPriceError::CouldNotFetchMetadata { - source_error: anyhow!("boo!"), - }) + let metadata = self.inner.lock().unwrap().clone(); + Ok(metadata) } +} +struct ErroringPersistedData; + +impl SetMetadataStorage for ErroringPersistedData { fn set_metadata(&mut self, _: &UpdaterMetadata) -> GasPriceResult<()> { Err(GasPriceError::CouldNotSetMetadata { block_height: Default::default(), @@ -123,8 +153,88 @@ impl MetadataStorage for ErroringMetadata { } } +impl GetMetadataStorage for ErroringPersistedData { + fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { + Err(GasPriceError::CouldNotFetchMetadata { + source_error: anyhow!("boo!"), + }) + } +} + +impl GetDaBundleId for ErroringPersistedData { + fn get_bundle_id(&self, _block_height: &BlockHeight) -> GasPriceResult> { + Err(GasPriceError::CouldNotFetchDARecord(anyhow!("boo!"))) + } +} + +struct UnimplementedStorageTx; + +impl GasPriceServiceAtomicStorage for ErroringPersistedData { + type Transaction<'a> = UnimplementedStorageTx; + + fn begin_transaction(&mut self) -> GasPriceResult> { + todo!() + } + + fn commit_transaction(_transaction: Self::Transaction<'_>) -> GasPriceResult<()> { + todo!() + } +} + +impl SetMetadataStorage for UnimplementedStorageTx { + fn set_metadata(&mut self, _metadata: &UpdaterMetadata) -> GasPriceResult<()> { + unimplemented!() + } +} + +impl GetMetadataStorage for UnimplementedStorageTx { + fn get_metadata( + &self, + _block_height: &BlockHeight, + ) -> GasPriceResult> { + unimplemented!() + } +} + +impl UnrecordedBlocks for UnimplementedStorageTx { + fn insert(&mut self, _height: Height, _bytes: Bytes) -> Result<(), String> { + unimplemented!() + } + + fn remove(&mut self, _height: &Height) -> Result, String> { + unimplemented!() + } +} + +impl SetDaBundleId for UnimplementedStorageTx { + fn set_bundle_id( + &mut self, + _block_height: &BlockHeight, + _bundle_id: u32, + ) -> GasPriceResult<()> { + unimplemented!() + } +} + +impl GetDaBundleId for UnimplementedStorageTx { + fn get_bundle_id(&self, _block_height: &BlockHeight) -> GasPriceResult> { + unimplemented!() + } +} + +impl AsUnrecordedBlocks for UnimplementedStorageTx { + type Wrapper<'a> = UnimplementedStorageTx + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_> { + UnimplementedStorageTx + } +} + struct FakeDABlockCost { da_block_costs: Receiver, + bundle_id: Arc>>, } impl FakeDABlockCost { @@ -132,20 +242,39 @@ impl FakeDABlockCost { let (_sender, receiver) = tokio::sync::mpsc::channel(1); Self { da_block_costs: receiver, + bundle_id: Arc::new(Mutex::new(None)), } } fn new(da_block_costs: Receiver) -> Self { - Self { da_block_costs } + Self { + da_block_costs, + bundle_id: Arc::new(Mutex::new(None)), + } + } + + fn never_returns_with_handle_to_bundle_id() -> (Self, Arc>>) { + let (_sender, receiver) = tokio::sync::mpsc::channel(1); + let bundle_id = Arc::new(Mutex::new(None)); + let service = Self { + da_block_costs: receiver, + bundle_id: bundle_id.clone(), + }; + (service, bundle_id) } } #[async_trait::async_trait] impl DaBlockCostsSource for FakeDABlockCost { - async fn request_da_block_cost(&mut self) -> anyhow::Result { + async fn request_da_block_cost(&mut self) -> Result { let costs = self.da_block_costs.recv().await.unwrap(); Ok(costs) } + + async fn set_last_value(&mut self, bundle_id: u32) -> Result<()> { + self.bundle_id.lock().unwrap().replace(bundle_id); + Ok(()) + } } fn zero_threshold_arbitrary_config() -> V1AlgorithmConfig { @@ -163,7 +292,6 @@ fn zero_threshold_arbitrary_config() -> V1AlgorithmConfig { capped_range_size: 0, decrease_range_size: 0, block_activity_threshold: 0, - unrecorded_blocks: vec![], } } @@ -178,7 +306,7 @@ fn arbitrary_metadata() -> V1Metadata { last_profit: 0, second_to_last_profit: 0, latest_da_cost_per_byte: 0, - unrecorded_blocks: vec![], + unrecorded_block_bytes: 0, } } @@ -197,10 +325,27 @@ fn different_arb_config() -> V1AlgorithmConfig { capped_range_size: 0, decrease_range_size: 0, block_activity_threshold: 0, - unrecorded_blocks: vec![], } } +fn database() -> StorageTransaction> { + InMemoryStorage::default().into_transaction() +} + +fn database_with_metadata( + metadata: &V1Metadata, +) -> StorageTransaction> { + let mut db = database(); + let mut tx = db.write_transaction(); + let height = metadata.l2_block_height.into(); + let metadata = UpdaterMetadata::V1(metadata.clone()); + tx.storage_as_mut::() + .insert(&height, &metadata) + .unwrap(); + tx.commit().unwrap(); + db +} + #[tokio::test] async fn next_gas_price__affected_by_new_l2_block() { // given @@ -219,16 +364,17 @@ async fn next_gas_price__affected_by_new_l2_block() { let config = zero_threshold_arbitrary_config(); let height = 0; + let inner = database(); let (algo_updater, shared_algo) = initialize_algorithm(&config, height, &metadata_storage).unwrap(); let da_source = FakeDABlockCost::never_returns(); let da_source_service = DaSourceService::new(da_source, None); let mut service = GasPriceServiceV1::new( l2_block_source, - metadata_storage, shared_algo, algo_updater, da_source_service, + inner, ); let read_algo = service.next_block_algorithm(); @@ -248,8 +394,9 @@ async fn next_gas_price__affected_by_new_l2_block() { #[tokio::test] async fn run__new_l2_block_saves_old_metadata() { // given + let height = 1; let l2_block = BlockInfo::Block { - height: 1, + height, gas_used: 60, block_gas_capacity: 100, block_bytes: 100, @@ -259,34 +406,36 @@ async fn run__new_l2_block_saves_old_metadata() { let l2_block_source = FakeL2BlockSource { l2_block: l2_block_receiver, }; - let metadata_inner = Arc::new(std::sync::Mutex::new(None)); - let metadata_storage = FakeMetadata { - inner: metadata_inner.clone(), - }; let config = zero_threshold_arbitrary_config(); - let height = 0; - let (algo_updater, shared_algo) = - initialize_algorithm(&config, height, &metadata_storage).unwrap(); + let inner = database(); + let algo_updater = updater_from_config(&config); + let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); let da_source_service = DaSourceService::new(da_source, None); let mut service = GasPriceServiceV1::new( l2_block_source, - metadata_storage, shared_algo, algo_updater, da_source_service, + inner, ); - let mut watcher = StateWatcher::default(); + let mut watcher = StateWatcher::started(); // when - service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); - service.shutdown().await.unwrap(); + service.run(&mut watcher).await; // then - let metadata_is_some = metadata_inner.lock().unwrap().is_some(); - assert!(metadata_is_some) + let metadata_is_some = service + .storage_tx_provider() + .get_metadata(&height.into()) + .unwrap() + .is_some(); + assert!(metadata_is_some); + + // cleanup + service.shutdown().await.unwrap(); } #[derive(Clone)] @@ -302,12 +451,26 @@ impl GasPriceSettingsProvider for FakeSettings { } #[derive(Clone)] -struct FakeGasPriceDb; +struct FakeGasPriceDb { + height: Option, +} + +impl FakeGasPriceDb { + fn new(height: u32) -> Self { + Self { + height: Some(height.into()), + } + } + + fn empty() -> Self { + Self { height: None } + } +} // GasPriceData + Modifiable + KeyValueInspect impl GasPriceData for FakeGasPriceDb { fn latest_height(&self) -> Option { - unimplemented!() + self.height } } @@ -373,22 +536,20 @@ async fn uninitialized_task__new__if_exists_already_reload_old_values_with_overr let descaleed_exec_price = original_metadata.new_scaled_exec_price / original_metadata.gas_price_factor; assert_ne!(different_config.new_exec_gas_price, descaleed_exec_price); - let different_l2_block = 1231; - assert_ne!(different_l2_block, original_metadata.l2_block_height); + let different_l2_block = 0; let settings = FakeSettings; let block_stream = empty_block_stream(); - let gas_price_db = FakeGasPriceDb; let on_chain_db = FakeOnChainDb::new(different_l2_block); let da_cost_source = FakeDABlockCost::never_returns(); - + let inner = database_with_metadata(&original_metadata); // when let service = UninitializedTask::new( - different_config, + different_config.clone(), + None, 0.into(), settings, block_stream, - gas_price_db, - metadata_storage, + inner, da_cost_source, on_chain_db, ) @@ -396,11 +557,15 @@ async fn uninitialized_task__new__if_exists_already_reload_old_values_with_overr // then let UninitializedTask { algo_updater, .. } = service; - algo_updater_matches_values_from_old_metadata(algo_updater, original_metadata); + algo_updater_matches_values_from_old_metadata( + algo_updater.clone(), + original_metadata.clone(), + ); + algo_updater_override_values_match(algo_updater, different_config); } fn algo_updater_matches_values_from_old_metadata( - algo_updater: AlgorithmUpdaterV1, + mut algo_updater: AlgorithmUpdaterV1, original_metadata: V1Metadata, ) { let V1Metadata { @@ -413,7 +578,7 @@ fn algo_updater_matches_values_from_old_metadata( last_profit: original_last_profit, second_to_last_profit: original_second_to_last_profit, latest_da_cost_per_byte: original_latest_da_cost_per_byte, - unrecorded_blocks: original_unrecorded_blocks, + unrecorded_block_bytes: original_unrecorded_block_bytes, } = original_metadata; assert_eq!( algo_updater.new_scaled_exec_price, @@ -443,34 +608,53 @@ fn algo_updater_matches_values_from_old_metadata( original_latest_da_cost_per_byte ); assert_eq!( - algo_updater - .unrecorded_blocks - .into_iter() - .collect::>(), - original_unrecorded_blocks.into_iter().collect::>() + algo_updater.unrecorded_blocks_bytes, + original_unrecorded_block_bytes ); } +fn algo_updater_override_values_match( + algo_updater: AlgorithmUpdaterV1, + config: V1AlgorithmConfig, +) { + assert_eq!(algo_updater.min_exec_gas_price, config.min_exec_gas_price); + assert_eq!( + algo_updater.exec_gas_price_change_percent, + config.exec_gas_price_change_percent + ); + assert_eq!( + algo_updater.l2_block_fullness_threshold_percent, + config.l2_block_fullness_threshold_percent.into() + ); + assert_eq!(algo_updater.gas_price_factor, config.gas_price_factor); + assert_eq!(algo_updater.min_da_gas_price, config.min_da_gas_price); + assert_eq!( + algo_updater.max_da_gas_price_change_percent, + config.max_da_gas_price_change_percent + ); + assert_eq!(algo_updater.da_p_component, config.da_p_component); + assert_eq!(algo_updater.da_d_component, config.da_d_component); +} + #[tokio::test] async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { // given let config = zero_threshold_arbitrary_config(); let different_l2_block = 1231; - let metadata_storage = ErroringMetadata; + let erroring_persisted_data = ErroringPersistedData; let settings = FakeSettings; let block_stream = empty_block_stream(); - let gas_price_db = FakeGasPriceDb; let on_chain_db = FakeOnChainDb::new(different_l2_block); let da_cost_source = FakeDABlockCost::never_returns(); // when let res = UninitializedTask::new( config, + None, 0.into(), settings, block_stream, - gas_price_db, - metadata_storage, + erroring_persisted_data, da_cost_source, on_chain_db, ); @@ -479,3 +663,45 @@ async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { let is_err = res.is_err(); assert!(is_err); } + +#[tokio::test] +async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() { + // given + let block_height = 1; + let bundle_id: u32 = 123; + let original_metadata = arbitrary_metadata(); + + let different_config = different_arb_config(); + let descaleed_exec_price = + original_metadata.new_scaled_exec_price / original_metadata.gas_price_factor; + assert_ne!(different_config.new_exec_gas_price, descaleed_exec_price); + let different_l2_block = 0; + let settings = FakeSettings; + let block_stream = empty_block_stream(); + let on_chain_db = FakeOnChainDb::new(different_l2_block); + let (da_cost_source, bundle_id_handle) = + FakeDABlockCost::never_returns_with_handle_to_bundle_id(); + let mut inner = database_with_metadata(&original_metadata); + let mut tx = inner.begin_transaction().unwrap(); + tx.set_bundle_id(&block_height.into(), bundle_id).unwrap(); + StorageTransaction::commit_transaction(tx).unwrap(); + let service = UninitializedTask::new( + different_config.clone(), + Some(block_height.into()), + 0.into(), + settings, + block_stream, + inner, + da_cost_source, + on_chain_db, + ) + .unwrap(); + + // when + service.init().await.unwrap(); + + // then + let actual = bundle_id_handle.lock().unwrap(); + let expected = Some(bundle_id); + assert_eq!(*actual, expected); +} diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs index 2f5699c41f7..96da25427d4 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -18,15 +18,22 @@ use crate::{ }, ports::{ GasPriceData, + GasPriceServiceAtomicStorage, GasPriceServiceConfig, + GetDaBundleId, + GetMetadataStorage, L2Data, - MetadataStorage, + SetDaBundleId, + SetMetadataStorage, }, v1::{ algorithm::SharedV1Algorithm, - da_source_service::service::{ - DaBlockCostsSource, - DaSourceService, + da_source_service::{ + block_committer_costs::BlockCommitterDaBlockCosts, + service::{ + DaBlockCostsSource, + DaSourceService, + }, }, metadata::{ v1_algorithm_from_metadata, @@ -37,6 +44,10 @@ use crate::{ initialize_algorithm, GasPriceServiceV1, }, + uninitialized_task::fuel_storage_unrecorded_blocks::{ + AsUnrecordedBlocks, + FuelStorageUnrecordedBlocks, + }, }, }; use anyhow::Error; @@ -47,24 +58,32 @@ use fuel_core_services::{ StateWatcher, }; use fuel_core_storage::{ + kv_store::{ + KeyValueInspect, + KeyValueMutate, + }, not_found, - transactional::AtomicView, + transactional::{ + AtomicView, + Modifiable, + StorageTransaction, + }, }; use fuel_core_types::{ fuel_tx::field::MintAmount, fuel_types::BlockHeight, services::block_importer::SharedImportResult, }; -use fuel_gas_price_algorithm::v1::AlgorithmUpdaterV1; +use fuel_gas_price_algorithm::v1::{ + AlgorithmUpdaterV1, + UnrecordedBlocks, +}; -pub struct UninitializedTask< - L2DataStoreView, - GasPriceStore, - Metadata, - DA, - SettingsProvider, -> { +pub mod fuel_storage_unrecorded_blocks; + +pub struct UninitializedTask { pub config: V1AlgorithmConfig, + pub gas_metadata_height: Option, pub genesis_block_height: BlockHeight, pub settings: SettingsProvider, pub gas_price_db: GasPriceStore, @@ -72,28 +91,26 @@ pub struct UninitializedTask< pub block_stream: BoxStream, pub(crate) shared_algo: SharedV1Algorithm, pub(crate) algo_updater: AlgorithmUpdaterV1, - pub(crate) metadata_storage: Metadata, pub(crate) da_source: DA, } -impl - UninitializedTask +impl + UninitializedTask where L2DataStore: L2Data, L2DataStoreView: AtomicView, - GasPriceStore: GasPriceData, - Metadata: MetadataStorage, + AtomicStorage: GasPriceServiceAtomicStorage, DA: DaBlockCostsSource, SettingsProvider: GasPriceSettingsProvider, { #[allow(clippy::too_many_arguments)] pub fn new( config: V1AlgorithmConfig, + gas_metadata_height: Option, genesis_block_height: BlockHeight, settings: SettingsProvider, block_stream: BoxStream, - gas_price_db: GasPriceStore, - metadata_storage: Metadata, + gas_price_db: AtomicStorage, da_source: DA, on_chain_db: L2DataStoreView, ) -> anyhow::Result { @@ -104,10 +121,11 @@ where .into(); let (algo_updater, shared_algo) = - initialize_algorithm(&config, latest_block_height, &metadata_storage)?; + initialize_algorithm(&config, latest_block_height, &gas_price_db)?; let task = Self { config, + gas_metadata_height, genesis_block_height, settings, gas_price_db, @@ -115,16 +133,15 @@ where block_stream, algo_updater, shared_algo, - metadata_storage, da_source, }; Ok(task) } - pub fn init( + pub async fn init( mut self, ) -> anyhow::Result< - GasPriceServiceV1, Metadata, DA>, + GasPriceServiceV1, DA, AtomicStorage>, > { let mut first_run = false; let latest_block_height: u32 = self @@ -134,7 +151,7 @@ where .unwrap_or(self.genesis_block_height) .into(); - let maybe_metadata_height = self.gas_price_db.latest_height(); + let maybe_metadata_height = self.gas_metadata_height; let metadata_height = if let Some(metadata_height) = maybe_metadata_height { metadata_height.into() } else { @@ -151,6 +168,11 @@ where // TODO: Add to config // https://github.com/FuelLabs/fuel-core/issues/2140 let poll_interval = None; + if let Some(bundle_id) = + self.gas_price_db.get_bundle_id(&metadata_height.into())? + { + self.da_source.set_last_value(bundle_id).await?; + } let da_service = DaSourceService::new(self.da_source, poll_interval); if BlockHeight::from(latest_block_height) == self.genesis_block_height @@ -158,10 +180,10 @@ where { let service = GasPriceServiceV1::new( l2_block_source, - self.metadata_storage, self.shared_algo, self.algo_updater, da_service, + self.gas_price_db, ); Ok(service) } else { @@ -169,19 +191,19 @@ where sync_gas_price_db_with_on_chain_storage( &self.settings, &self.config, - &mut self.metadata_storage, &self.on_chain_db, metadata_height, latest_block_height, + &mut self.gas_price_db, )?; } let service = GasPriceServiceV1::new( l2_block_source, - self.metadata_storage, self.shared_algo, self.algo_updater, da_service, + self.gas_price_db, ); Ok(service) } @@ -189,20 +211,18 @@ where } #[async_trait::async_trait] -impl - RunnableService - for UninitializedTask +impl RunnableService + for UninitializedTask where L2DataStore: L2Data, L2DataStoreView: AtomicView, - GasPriceStore: GasPriceData, - Metadata: MetadataStorage, - DA: DaBlockCostsSource, - SettingsProvider: GasPriceSettingsProvider, + AtomicStorage: GasPriceServiceAtomicStorage + GasPriceData, + DA: DaBlockCostsSource + 'static, + SettingsProvider: GasPriceSettingsProvider + 'static, { const NAME: &'static str = "GasPriceServiceV1"; type SharedData = SharedV1Algorithm; - type Task = GasPriceServiceV1, Metadata, DA>; + type Task = GasPriceServiceV1, DA, AtomicStorage>; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -214,30 +234,30 @@ where _state_watcher: &StateWatcher, _params: Self::TaskParams, ) -> anyhow::Result { - UninitializedTask::init(self) + UninitializedTask::init(self).await } } fn sync_gas_price_db_with_on_chain_storage< L2DataStore, L2DataStoreView, - Metadata, SettingsProvider, + AtomicStorage, >( settings: &SettingsProvider, config: &V1AlgorithmConfig, - metadata_storage: &mut Metadata, on_chain_db: &L2DataStoreView, metadata_height: u32, latest_block_height: u32, + persisted_data: &mut AtomicStorage, ) -> anyhow::Result<()> where L2DataStore: L2Data, L2DataStoreView: AtomicView, - Metadata: MetadataStorage, SettingsProvider: GasPriceSettingsProvider, + AtomicStorage: GasPriceServiceAtomicStorage, { - let metadata = metadata_storage + let metadata = persisted_data .get_metadata(&metadata_height.into())? .ok_or(anyhow::anyhow!( "Expected metadata to exist for height: {metadata_height}" @@ -257,28 +277,29 @@ where metadata_height, latest_block_height, &mut algo_updater, - metadata_storage, + persisted_data, )?; Ok(()) } -fn sync_v1_metadata( +fn sync_v1_metadata( settings: &SettingsProvider, on_chain_db: &L2DataStoreView, metadata_height: u32, latest_block_height: u32, updater: &mut AlgorithmUpdaterV1, - metadata_storage: &mut Metadata, + da_storage: &mut AtomicStorage, ) -> anyhow::Result<()> where L2DataStore: L2Data, L2DataStoreView: AtomicView, - Metadata: MetadataStorage, SettingsProvider: GasPriceSettingsProvider, + AtomicStorage: GasPriceServiceAtomicStorage, { let first = metadata_height.saturating_add(1); let view = on_chain_db.latest_view()?; + let mut tx = da_storage.begin_transaction()?; for height in first..=latest_block_height { let block = view .get_block(&height.into())? @@ -307,51 +328,43 @@ where block_gas_capacity, block_bytes, fee_wei.into(), + &mut tx.as_unrecorded_blocks(), )?; let metadata: UpdaterMetadata = updater.clone().into(); - metadata_storage.set_metadata(&metadata)?; + tx.set_metadata(&metadata)?; } + AtomicStorage::commit_transaction(tx)?; Ok(()) } #[allow(clippy::type_complexity)] -#[allow(clippy::too_many_arguments)] -pub fn new_gas_price_service_v1< - L2DataStore, - GasPriceStore, - Metadata, - DA, - SettingsProvider, ->( +pub fn new_gas_price_service_v1( v1_config: V1AlgorithmConfig, genesis_block_height: BlockHeight, settings: SettingsProvider, block_stream: BoxStream, - gas_price_db: GasPriceStore, - metadata: Metadata, + gas_price_db: AtomicStorage, da_source: DA, on_chain_db: L2DataStore, ) -> anyhow::Result< - ServiceRunner< - UninitializedTask, - >, + ServiceRunner>, > where L2DataStore: AtomicView, L2DataStore::LatestView: L2Data, - GasPriceStore: GasPriceData, + AtomicStorage: GasPriceServiceAtomicStorage + GasPriceData, SettingsProvider: GasPriceSettingsProvider, - Metadata: MetadataStorage, DA: DaBlockCostsSource, { + let metadata_height = gas_price_db.latest_height(); let gas_price_init = UninitializedTask::new( v1_config, + metadata_height, genesis_block_height, settings, block_stream, gas_price_db, - metadata, da_source, on_chain_db, )?; diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs b/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs new file mode 100644 index 00000000000..61d6bae5528 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs @@ -0,0 +1,123 @@ +use crate::common::fuel_core_storage_adapter::storage::{ + GasPriceColumn, + UnrecordedBlocksTable, +}; +use fuel_core_storage::{ + kv_store::{ + KeyValueInspect, + KeyValueMutate, + }, + transactional::{ + Modifiable, + WriteTransaction, + }, + Error as StorageError, + StorageAsMut, + StorageAsRef, + StorageMutate, +}; +use fuel_core_types::fuel_merkle::storage::StorageMutateInfallible; +use fuel_gas_price_algorithm::{ + v1, + v1::UnrecordedBlocks, +}; + +pub trait AsUnrecordedBlocks { + type Wrapper<'a>: UnrecordedBlocks + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_>; +} + +impl AsUnrecordedBlocks for S +where + S: StorageMutate, +{ + type Wrapper<'a> = FuelStorageUnrecordedBlocks<&'a mut Self> + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_> { + FuelStorageUnrecordedBlocks::new(self) + } +} + +#[derive(Debug, Clone)] +pub struct FuelStorageUnrecordedBlocks { + inner: Storage, +} + +impl FuelStorageUnrecordedBlocks { + pub fn new(inner: Storage) -> Self { + Self { inner } + } +} + +impl UnrecordedBlocks for FuelStorageUnrecordedBlocks +where + S: StorageMutate, +{ + fn insert(&mut self, height: v1::Height, bytes: v1::Bytes) -> Result<(), String> { + self.inner + .storage_as_mut::() + .insert(&height.into(), &bytes) + .map_err(|err| format!("Error: {:?}", err))?; + Ok(()) + } + + fn remove(&mut self, height: &v1::Height) -> Result, String> { + let bytes = self + .inner + .storage_as_mut::() + .take(&(*height).into()) + .map_err(|err| format!("Error: {:?}", err))?; + Ok(bytes) + } +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_storage::{ + structured_storage::test::InMemoryStorage, + transactional::{ + IntoTransaction, + StorageTransaction, + }, + }; + + fn database() -> StorageTransaction> { + InMemoryStorage::default().into_transaction() + } + + #[test] + fn insert__remove__round_trip() { + // given + let mut storage = FuelStorageUnrecordedBlocks::new(database()); + let height = 8; + let bytes = 100; + + // when + storage.insert(height, bytes).unwrap(); + let actual = storage.remove(&height).unwrap(); + + // then + let expected = Some(bytes); + assert_eq!(expected, actual); + } + + #[test] + fn remove__if_not_inserted_returns_none() { + // given + let mut storage = FuelStorageUnrecordedBlocks::new(database()); + let height = 8; + + // when + let maybe_value = storage.remove(&height).unwrap(); + + // then + assert!(maybe_value.is_none()); + } +}