diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 4fd45534b24..03003319235 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -182,7 +182,7 @@ pub fn init_sub_services( let genesis_block_height = *genesis_block.header().height(); let settings = consensus_parameters_provider.clone(); let block_stream = importer_adapter.events_shared_result(); - let metadata = StructuredStorage::new(database.gas_price().clone()); + let metadata = database.gas_price().clone(); let gas_price_service_v0 = new_gas_price_service_v0( config.clone().into(), @@ -190,7 +190,7 @@ pub fn init_sub_services( settings, block_stream, database.gas_price().clone(), - metadata, + StructuredStorage::new(metadata), database.on_chain().clone(), )?; diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index 10f71d1d3d9..ac670d7b38f 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -65,18 +65,20 @@ impl AlgorithmV1 { pub type Height = u32; pub type Bytes = u64; + pub trait UnrecordedBlocks { - fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), Error>; - fn remove(&mut self, height: &Height) -> Result, Error>; + fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), String>; + + fn remove(&mut self, height: &Height) -> Result, String>; } impl UnrecordedBlocks for BTreeMap { - fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), Error> { + fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), String> { self.insert(height, bytes); Ok(()) } - fn remove(&mut self, height: &Height) -> Result, Error> { + fn remove(&mut self, height: &Height) -> Result, String> { let value = self.remove(height); Ok(value) } @@ -395,7 +397,9 @@ impl AlgorithmUpdaterV1 { self.update_da_gas_price(); // metadata - unrecorded_blocks.insert(height, block_bytes)?; + unrecorded_blocks + .insert(height, block_bytes) + .map_err(Error::CouldNotInsertUnrecordedBlock)?; self.unrecorded_blocks_bytes = self .unrecorded_blocks_bytes .saturating_add(block_bytes as u128); @@ -563,7 +567,7 @@ impl AlgorithmUpdaterV1 { recording_cost: u128, unrecorded_blocks: &mut U, ) -> Result<(), Error> { - self.update_unrecorded_block_bytes(heights, unrecorded_blocks); + self.update_unrecorded_block_bytes(heights, unrecorded_blocks)?; let new_da_block_cost = self .latest_known_total_da_cost_excess @@ -589,26 +593,25 @@ impl AlgorithmUpdaterV1 { &mut self, heights: &[u32], unrecorded_blocks: &mut U, - ) { + ) -> Result<(), Error> { let mut total: u128 = 0; for expected_height in heights { - let res = unrecorded_blocks.remove(expected_height); - match res { - Ok(Some(bytes)) => { - total = total.saturating_add(bytes as u128); - } - Ok(None) => { - tracing::warn!( - "L2 block expected but not found in unrecorded blocks: {}", - expected_height, - ); - } - Err(err) => { - tracing::error!("Could not remove unrecorded block: {}", err); - } + let maybe_bytes = unrecorded_blocks + .remove(expected_height) + .map_err(Error::CouldNotRemoveUnrecordedBlock)?; + + if let Some(bytes) = maybe_bytes { + total = total.saturating_add(bytes as u128); + } else { + tracing::warn!( + "L2 block expected but not found in unrecorded blocks: {}", + expected_height, + ); } } self.unrecorded_blocks_bytes = self.unrecorded_blocks_bytes.saturating_sub(total); + + Ok(()) } fn recalculate_projected_cost(&mut self) { diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs index f1c38d504d2..591707cb46b 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs @@ -1,29 +1,27 @@ -use crate::common::utils::{ - BlockInfo, - Error as GasPriceError, - Result as GasPriceResult, -}; -use anyhow::anyhow; -use fuel_core_types::fuel_types::BlockHeight; - use crate::{ common::{ fuel_core_storage_adapter::storage::{ GasPriceColumn, GasPriceMetadata, SequenceNumberTable, - UnrecordedBlocksTable, }, updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Error as GasPriceError, + Result as GasPriceResult, + }, }, ports::{ + GasPriceServiceAtomicStorage, GetDaSequenceNumber, GetMetadataStorage, SetDaSequenceNumber, SetMetadataStorage, - TransactionableStorage, }, }; +use anyhow::anyhow; +use core::cmp::min; use fuel_core_storage::{ codec::{ postcard::Postcard, @@ -36,14 +34,17 @@ use fuel_core_storage::{ StorageTransaction, WriteTransaction, }, + Error as StorageError, StorageAsMut, StorageAsRef, + StorageInspect, }; use fuel_core_types::{ blockchain::{ block::Block, header::ConsensusParametersVersion, }, + fuel_merkle::storage::StorageMutate, fuel_tx::{ field::{ MintAmount, @@ -51,14 +52,8 @@ use fuel_core_types::{ }, Transaction, }, + fuel_types::BlockHeight, }; -use fuel_gas_price_algorithm::v1::{ - Bytes, - Error, - Height, - UnrecordedBlocks, -}; -use std::cmp::min; #[cfg(test)] mod metadata_tests; @@ -67,8 +62,8 @@ pub mod storage; impl SetMetadataStorage for StructuredStorage where - Storage: KeyValueInspect + Modifiable, Storage: Send + Sync, + Storage: KeyValueInspect + Modifiable, { fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { let block_height = metadata.l2_block_height(); @@ -84,9 +79,10 @@ where } } -impl GetMetadataStorage for StructuredStorage +impl GetMetadataStorage for Storage where - Storage: KeyValueInspect + Send + Sync, + Storage: Send + Sync, + Storage: StorageInspect, { fn get_metadata( &self, @@ -102,9 +98,10 @@ where } } -impl GetDaSequenceNumber for StructuredStorage +impl GetDaSequenceNumber for Storage where - Storage: KeyValueInspect + Send + Sync, + Storage: Send + Sync, + Storage: StorageInspect, { fn get_sequence_number( &self, @@ -119,137 +116,50 @@ where } } -pub struct NewStorageTransaction<'a, Storage> { - inner: StorageTransaction<&'a mut StructuredStorage>, -} - -impl<'a, Storage> NewStorageTransaction<'a, Storage> { - fn wrap(inner: StorageTransaction<&'a mut StructuredStorage>) -> Self { - Self { inner } - } -} - -impl<'a, Storage> UnrecordedBlocks for NewStorageTransaction<'a, Storage> +impl GasPriceServiceAtomicStorage for Storage where + Storage: 'static, + Storage: GetMetadataStorage + GetDaSequenceNumber, Storage: KeyValueInspect + Modifiable + Send + Sync, { - fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), Error> { - self.inner - .storage_as_mut::() - .insert(&height, &bytes) - .map_err(|err| { - Error::CouldNotInsertUnrecordedBlock(format!("Error: {:?}", err)) - })?; - Ok(()) - } - - fn remove(&mut self, height: &Height) -> Result, Error> { - let bytes = self - .inner - .storage_as_mut::() - .take(height) - .map_err(|err| { - Error::CouldNotRemoveUnrecordedBlock(format!("Error: {:?}", err)) - })?; - Ok(bytes) - } -} - -impl TransactionableStorage for StructuredStorage -where - Storage: Modifiable + Send + Sync, -{ - type Transaction<'a> = NewStorageTransaction<'a, Storage> where Self: 'a; + type Transaction<'a> = StorageTransaction<&'a mut Storage> where Self: 'a; fn begin_transaction(&mut self) -> GasPriceResult> { let tx = self.write_transaction(); - let wrapped = NewStorageTransaction::wrap(tx); - Ok(wrapped) + Ok(tx) } fn commit_transaction(transaction: Self::Transaction<'_>) -> GasPriceResult<()> { transaction - .inner .commit() .map_err(|err| GasPriceError::CouldNotCommit(err.into()))?; Ok(()) } } -impl<'a, Storage> SetMetadataStorage for NewStorageTransaction<'a, Storage> -where - Storage: KeyValueInspect + Modifiable + Send + Sync, -{ - fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { - let block_height = metadata.l2_block_height(); - self.inner - .storage_as_mut::() - .insert(&block_height, metadata) - .map_err(|err| GasPriceError::CouldNotSetMetadata { - block_height, - source_error: err.into(), - })?; - Ok(()) - } -} -impl<'a, Storage> GetMetadataStorage for NewStorageTransaction<'a, Storage> -where - Storage: KeyValueInspect + Send + Sync, -{ - fn get_metadata( - &self, - block_height: &BlockHeight, - ) -> GasPriceResult> { - let metadata = self - .inner - .storage::() - .get(block_height) - .map_err(|err| GasPriceError::CouldNotFetchMetadata { - source_error: err.into(), - })?; - Ok(metadata.map(|inner| inner.into_owned())) - } -} - -impl<'a, Storage> SetDaSequenceNumber for NewStorageTransaction<'a, Storage> +impl SetDaSequenceNumber for Storage where - Storage: KeyValueInspect + Modifiable + Send + Sync, + Storage: Send + Sync, + Storage: StorageMutate, { fn set_sequence_number( &mut self, block_height: &BlockHeight, sequence_number: u32, ) -> GasPriceResult<()> { - self.inner - .storage_as_mut::() + self.storage_as_mut::() .insert(block_height, &sequence_number) .map_err(|err| GasPriceError::CouldNotFetchDARecord(err.into()))?; Ok(()) } } -impl<'a, Storage> GetDaSequenceNumber for NewStorageTransaction<'a, Storage> -where - Storage: KeyValueInspect + Modifiable + Send + Sync, -{ - fn get_sequence_number( - &self, - block_height: &BlockHeight, - ) -> GasPriceResult> { - let sequence_number = self - .inner - .storage::() - .get(block_height) - .map_err(|err| GasPriceError::CouldNotFetchDARecord(err.into()))? - .map(|no| *no); - Ok(sequence_number) - } -} #[derive(Debug, Clone, PartialEq)] pub struct GasPriceSettings { pub gas_price_factor: u64, pub block_gas_limit: u64, } + pub trait GasPriceSettingsProvider: Send + Sync + Clone { fn settings( &self, diff --git a/crates/services/gas_price_service/src/ports.rs b/crates/services/gas_price_service/src/ports.rs index 9b92156bb69..0bd601cf7a8 100644 --- a/crates/services/gas_price_service/src/ports.rs +++ b/crates/services/gas_price_service/src/ports.rs @@ -11,7 +11,10 @@ use crate::{ utils::Result, }, v0::metadata::V0AlgorithmConfig, - v1::metadata::V1AlgorithmConfig, + v1::{ + metadata::V1AlgorithmConfig, + uninitialized_task::fuel_storage_unrecorded_blocks::AsUnrecordedBlocks, + }, }; pub trait L2Data: Send + Sync { @@ -43,12 +46,22 @@ pub trait GetDaSequenceNumber: Send + Sync { fn get_sequence_number(&self, block_height: &BlockHeight) -> Result>; } -pub trait TransactionableStorage: Send + Sync { - type Transaction<'a> +pub trait GasPriceServiceAtomicStorage +where + Self: 'static, + Self: Send + Sync, + Self: GetMetadataStorage + GetDaSequenceNumber, +{ + type Transaction<'a>: AsUnrecordedBlocks + + SetMetadataStorage + + GetMetadataStorage + + SetDaSequenceNumber + + GetDaSequenceNumber where Self: 'a; fn begin_transaction(&mut self) -> Result>; + fn commit_transaction(transaction: Self::Transaction<'_>) -> Result<()>; } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 7800d80489c..108f50591e6 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -32,10 +32,7 @@ impl SharedState { /// This struct houses the shared_state, polling interval /// and a source, which does the actual fetching of the data -pub struct DaSourceService -where - Source: DaBlockCostsSource, -{ +pub struct DaSourceService { poll_interval: Interval, source: Source, shared_state: SharedState, diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index fbd4959e01b..1ba2462165d 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -11,10 +11,10 @@ use crate::{ }, }, ports::{ + GasPriceServiceAtomicStorage, GetMetadataStorage, SetDaSequenceNumber, SetMetadataStorage, - TransactionableStorage, }, v0::metadata::V0Metadata, v1::{ @@ -33,7 +33,10 @@ use crate::{ V1AlgorithmConfig, V1Metadata, }, - uninitialized_task::fuel_storage_unrecorded_blocks::FuelStorageUnrecordedBlocks, + uninitialized_task::fuel_storage_unrecorded_blocks::{ + AsUnrecordedBlocks, + FuelStorageUnrecordedBlocks, + }, }, }; use anyhow::anyhow; @@ -57,11 +60,7 @@ use futures::FutureExt; use tokio::sync::broadcast::Receiver; /// The service that updates the gas price algorithm. -pub struct GasPriceServiceV1 -where - DA: DaBlockCostsSource, - StorageTxProvider: TransactionableStorage, -{ +pub struct GasPriceServiceV1 { /// The algorithm that can be used in the next block shared_algo: SharedV1Algorithm, /// The L2 block source @@ -78,20 +77,16 @@ where storage_tx_provider: StorageTxProvider, } -impl GasPriceServiceV1 +impl GasPriceServiceV1 where L2: L2BlockSource, DA: DaBlockCostsSource, - StorageTxProvider: TransactionableStorage, + AtomicStorage: GasPriceServiceAtomicStorage, { - async fn commit_block_data_to_algorithm<'a>( - &'a mut self, + async fn commit_block_data_to_algorithm( + &mut self, l2_block_res: GasPriceResult, - ) -> anyhow::Result<()> - where - StorageTxProvider::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks + SetDaSequenceNumber, - { + ) -> anyhow::Result<()> { tracing::info!("Received L2 block result: {:?}", l2_block_res); let block = l2_block_res?; @@ -101,17 +96,17 @@ where } } -impl GasPriceServiceV1 +impl GasPriceServiceV1 where DA: DaBlockCostsSource, - StorageTxProvider: TransactionableStorage, + AtomicStorage: GasPriceServiceAtomicStorage, { pub fn new( l2_block_source: L2, shared_algo: SharedV1Algorithm, algorithm_updater: AlgorithmUpdaterV1, da_source_adapter_handle: DaSourceService, - storage_tx_provider: StorageTxProvider, + storage_tx_provider: AtomicStorage, ) -> Self { let da_source_channel = da_source_adapter_handle.shared_data().clone().subscribe(); @@ -135,7 +130,7 @@ where } #[cfg(test)] - pub fn storage_tx_provider(&self) -> &StorageTxProvider { + pub fn storage_tx_provider(&self) -> &AtomicStorage { &self.storage_tx_provider } @@ -150,18 +145,14 @@ where .ok_or_else(|| anyhow!("Block gas capacity must be non-zero")) } - async fn handle_normal_block<'a>( - &'a mut self, + async fn handle_normal_block( + &mut self, height: u32, gas_used: u64, block_gas_capacity: u64, block_bytes: u64, block_fees: u64, - ) -> anyhow::Result<()> - where - StorageTxProvider::Transaction<'a>: - UnrecordedBlocks + SetMetadataStorage + SetDaSequenceNumber, - { + ) -> anyhow::Result<()> { let capacity = Self::validate_block_gas_capacity(block_gas_capacity)?; let mut storage_tx = self.storage_tx_provider.begin_transaction()?; @@ -171,7 +162,7 @@ where &da_block_costs.l2_blocks, da_block_costs.blob_size_bytes, da_block_costs.blob_cost_wei, - &mut storage_tx, + &mut storage_tx.as_unrecorded_blocks(), )?; storage_tx .set_sequence_number( @@ -187,33 +178,29 @@ where capacity, block_bytes, block_fees as u128, - &mut storage_tx, + &mut storage_tx.as_unrecorded_blocks(), )?; let metadata = self.algorithm_updater.clone().into(); storage_tx .set_metadata(&metadata) .map_err(|err| anyhow!(err))?; - StorageTxProvider::commit_transaction(storage_tx)?; + AtomicStorage::commit_transaction(storage_tx)?; let new_algo = self.algorithm_updater.algorithm(); self.shared_algo.update(new_algo).await; Ok(()) } - async fn apply_block_info_to_gas_algorithm<'a>( - &'a mut self, + async fn apply_block_info_to_gas_algorithm( + &mut self, l2_block: BlockInfo, - ) -> anyhow::Result<()> - where - StorageTxProvider::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks + SetDaSequenceNumber, - { + ) -> anyhow::Result<()> { match l2_block { BlockInfo::GenesisBlock => { let metadata: UpdaterMetadata = self.algorithm_updater.clone().into(); let mut tx = self.storage_tx_provider.begin_transaction()?; tx.set_metadata(&metadata).map_err(|err| anyhow!(err))?; - StorageTxProvider::commit_transaction(tx)?; + AtomicStorage::commit_transaction(tx)?; let new_algo = self.algorithm_updater.algorithm(); // self.update(new_algo).await; self.shared_algo.update(new_algo).await; @@ -241,14 +228,11 @@ where } #[async_trait] -impl RunnableTask - for GasPriceServiceV1 +impl RunnableTask for GasPriceServiceV1 where L2: L2BlockSource, DA: DaBlockCostsSource, - StorageTxProvider: TransactionableStorage, - for<'a> StorageTxProvider::Transaction<'a>: - UnrecordedBlocks + SetMetadataStorage + SetDaSequenceNumber, + AtomicStorage: GasPriceServiceAtomicStorage, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs index 976ed49c625..92fd6757f05 100644 --- a/crates/services/gas_price_service/src/v1/tests.rs +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -8,7 +8,6 @@ use crate::{ }, GasPriceSettings, GasPriceSettingsProvider, - NewStorageTransaction, }, l2_block_source::L2BlockSource, updater_metadata::UpdaterMetadata, @@ -20,12 +19,12 @@ use crate::{ }, ports::{ GasPriceData, + GasPriceServiceAtomicStorage, GetDaSequenceNumber, GetMetadataStorage, L2Data, SetDaSequenceNumber, SetMetadataStorage, - TransactionableStorage, }, v1::{ algorithm::SharedV1Algorithm, @@ -46,7 +45,7 @@ use crate::{ GasPriceServiceV1, }, uninitialized_task::{ - fuel_storage_unrecorded_blocks::FuelStorageUnrecordedBlocks, + fuel_storage_unrecorded_blocks::AsUnrecordedBlocks, UninitializedTask, }, }, @@ -135,6 +134,7 @@ impl SetMetadataStorage for FakeMetadata { Ok(()) } } + impl GetMetadataStorage for FakeMetadata { fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { let metadata = self.inner.lock().unwrap().clone(); @@ -152,6 +152,7 @@ impl SetMetadataStorage for ErroringPersistedData { }) } } + impl GetMetadataStorage for ErroringPersistedData { fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { Err(GasPriceError::CouldNotFetchMetadata { @@ -171,7 +172,7 @@ impl GetDaSequenceNumber for ErroringPersistedData { struct UnimplementedStorageTx; -impl TransactionableStorage for ErroringPersistedData { +impl GasPriceServiceAtomicStorage for ErroringPersistedData { type Transaction<'a> = UnimplementedStorageTx; fn begin_transaction(&mut self) -> GasPriceResult> { @@ -188,6 +189,7 @@ impl SetMetadataStorage for UnimplementedStorageTx { unimplemented!() } } + impl GetMetadataStorage for UnimplementedStorageTx { fn get_metadata( &self, @@ -198,11 +200,11 @@ impl GetMetadataStorage for UnimplementedStorageTx { } impl UnrecordedBlocks for UnimplementedStorageTx { - fn insert(&mut self, _height: Height, _bytes: Bytes) -> Result<(), Error> { + fn insert(&mut self, _height: Height, _bytes: Bytes) -> Result<(), String> { unimplemented!() } - fn remove(&mut self, _height: &Height) -> Result, Error> { + fn remove(&mut self, _height: &Height) -> Result, String> { unimplemented!() } } @@ -216,6 +218,7 @@ impl SetDaSequenceNumber for UnimplementedStorageTx { unimplemented!() } } + impl GetDaSequenceNumber for UnimplementedStorageTx { fn get_sequence_number( &self, @@ -225,6 +228,16 @@ impl GetDaSequenceNumber for UnimplementedStorageTx { } } +impl AsUnrecordedBlocks for UnimplementedStorageTx { + type Wrapper<'a> = UnimplementedStorageTx + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_> { + UnimplementedStorageTx + } +} + struct FakeDABlockCost { da_block_costs: Receiver, sequence_number: Arc>>, @@ -402,10 +415,6 @@ async fn run__new_l2_block_saves_old_metadata() { let l2_block_source = FakeL2BlockSource { l2_block: l2_block_receiver, }; - let metadata_inner = Arc::new(std::sync::Mutex::new(None)); - let metadata_storage = FakeMetadata { - inner: metadata_inner.clone(), - }; let config = zero_threshold_arbitrary_config(); let inner = database(); @@ -539,20 +548,19 @@ async fn uninitialized_task__new__if_exists_already_reload_old_values_with_overr let different_l2_block = 0; let settings = FakeSettings; let block_stream = empty_block_stream(); - let gas_price_db = FakeGasPriceDb::empty(); let on_chain_db = FakeOnChainDb::new(different_l2_block); let da_cost_source = FakeDABlockCost::never_returns(); let inner = database_with_metadata(&original_metadata); // when let service = UninitializedTask::new( different_config.clone(), + None, 0.into(), settings, block_stream, - gas_price_db, + inner, da_cost_source, on_chain_db, - inner, ) .unwrap(); @@ -645,20 +653,19 @@ async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { let erroring_persisted_data = ErroringPersistedData; let settings = FakeSettings; let block_stream = empty_block_stream(); - let gas_price_db = FakeGasPriceDb::empty(); let on_chain_db = FakeOnChainDb::new(different_l2_block); let da_cost_source = FakeDABlockCost::never_returns(); // when let res = UninitializedTask::new( config, + None, 0.into(), settings, block_stream, - gas_price_db, + erroring_persisted_data, da_cost_source, on_chain_db, - erroring_persisted_data, ); // then @@ -680,7 +687,6 @@ async fn uninitialized_task__init__starts_da_service_with_sequence_number_in_sto let different_l2_block = 0; let settings = FakeSettings; let block_stream = empty_block_stream(); - let gas_price_db = FakeGasPriceDb::new(block_height); let on_chain_db = FakeOnChainDb::new(different_l2_block); let (da_cost_source, sequence_number_handle) = FakeDABlockCost::never_returns_with_handle_to_sequence_number(); @@ -691,13 +697,13 @@ async fn uninitialized_task__init__starts_da_service_with_sequence_number_in_sto StorageTransaction::commit_transaction(tx).unwrap(); let service = UninitializedTask::new( different_config.clone(), + Some(block_height.into()), 0.into(), settings, block_stream, - gas_price_db, + inner, da_cost_source, on_chain_db, - inner, ) .unwrap(); diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs index a9a8eb2792a..ac0ce06da01 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -18,13 +18,13 @@ use crate::{ }, ports::{ GasPriceData, + GasPriceServiceAtomicStorage, GasPriceServiceConfig, GetDaSequenceNumber, GetMetadataStorage, L2Data, SetDaSequenceNumber, SetMetadataStorage, - TransactionableStorage, }, v1::{ algorithm::SharedV1Algorithm, @@ -44,7 +44,10 @@ use crate::{ initialize_algorithm, GasPriceServiceV1, }, - uninitialized_task::fuel_storage_unrecorded_blocks::FuelStorageUnrecordedBlocks, + uninitialized_task::fuel_storage_unrecorded_blocks::{ + AsUnrecordedBlocks, + FuelStorageUnrecordedBlocks, + }, }, }; use anyhow::Error; @@ -78,14 +81,9 @@ use fuel_gas_price_algorithm::v1::{ pub mod fuel_storage_unrecorded_blocks; -pub struct UninitializedTask< - L2DataStoreView, - GasPriceStore, - DA, - SettingsProvider, - PersistedData, -> { +pub struct UninitializedTask { pub config: V1AlgorithmConfig, + pub gas_metadata_height: Option, pub genesis_block_height: BlockHeight, pub settings: SettingsProvider, pub gas_price_db: GasPriceStore, @@ -94,39 +92,27 @@ pub struct UninitializedTask< pub(crate) shared_algo: SharedV1Algorithm, pub(crate) algo_updater: AlgorithmUpdaterV1, pub(crate) da_source: DA, - pub persisted_data: PersistedData, } -impl< - L2DataStore, - L2DataStoreView, - GasPriceStore, - DA, - SettingsProvider, - PersistedData, - > - UninitializedTask +impl + UninitializedTask where L2DataStore: L2Data, L2DataStoreView: AtomicView, - GasPriceStore: GasPriceData, - PersistedData: GetMetadataStorage + GetDaSequenceNumber, - PersistedData: TransactionableStorage, - for<'a> PersistedData::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks + SetDaSequenceNumber, + AtomicStorage: GasPriceServiceAtomicStorage, DA: DaBlockCostsSource, SettingsProvider: GasPriceSettingsProvider, { #[allow(clippy::too_many_arguments)] pub fn new( config: V1AlgorithmConfig, + gas_metadata_height: Option, genesis_block_height: BlockHeight, settings: SettingsProvider, block_stream: BoxStream, - gas_price_db: GasPriceStore, + gas_price_db: AtomicStorage, da_source: DA, on_chain_db: L2DataStoreView, - persisted_data: PersistedData, ) -> anyhow::Result { let latest_block_height: u32 = on_chain_db .latest_view()? @@ -135,10 +121,11 @@ where .into(); let (algo_updater, shared_algo) = - initialize_algorithm(&config, latest_block_height, &persisted_data)?; + initialize_algorithm(&config, latest_block_height, &gas_price_db)?; let task = Self { config, + gas_metadata_height, genesis_block_height, settings, gas_price_db, @@ -147,7 +134,6 @@ where algo_updater, shared_algo, da_source, - persisted_data, }; Ok(task) } @@ -155,7 +141,7 @@ where pub async fn init( mut self, ) -> anyhow::Result< - GasPriceServiceV1, DA, PersistedData>, + GasPriceServiceV1, DA, AtomicStorage>, > { let mut first_run = false; let latest_block_height: u32 = self @@ -165,7 +151,7 @@ where .unwrap_or(self.genesis_block_height) .into(); - let maybe_metadata_height = self.gas_price_db.latest_height(); + let maybe_metadata_height = self.gas_metadata_height; let metadata_height = if let Some(metadata_height) = maybe_metadata_height { metadata_height.into() } else { @@ -183,7 +169,7 @@ where // https://github.com/FuelLabs/fuel-core/issues/2140 let poll_interval = None; if let Some(sequence_number) = self - .persisted_data + .gas_price_db .get_sequence_number(&metadata_height.into())? { self.da_source.set_last_value(sequence_number).await?; @@ -198,7 +184,7 @@ where self.shared_algo, self.algo_updater, da_service, - self.persisted_data, + self.gas_price_db, ); Ok(service) } else { @@ -209,7 +195,7 @@ where &self.on_chain_db, metadata_height, latest_block_height, - &mut self.persisted_data, + &mut self.gas_price_db, )?; } @@ -218,7 +204,7 @@ where self.shared_algo, self.algo_updater, da_service, - self.persisted_data, + self.gas_price_db, ); Ok(service) } @@ -226,35 +212,18 @@ where } #[async_trait::async_trait] -impl< - L2DataStore, - L2DataStoreView, - GasPriceStore, - DA, - SettingsProvider, - PersistedData, - > RunnableService - for UninitializedTask< - L2DataStoreView, - GasPriceStore, - DA, - SettingsProvider, - PersistedData, - > +impl RunnableService + for UninitializedTask where L2DataStore: L2Data, L2DataStoreView: AtomicView, - GasPriceStore: GasPriceData, + AtomicStorage: GasPriceServiceAtomicStorage + GasPriceData, DA: DaBlockCostsSource + 'static, SettingsProvider: GasPriceSettingsProvider + 'static, - PersistedData: - GetMetadataStorage + GetDaSequenceNumber + TransactionableStorage + 'static, - for<'a> ::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks + SetDaSequenceNumber, { const NAME: &'static str = "GasPriceServiceV1"; type SharedData = SharedV1Algorithm; - type Task = GasPriceServiceV1, DA, PersistedData>; + type Task = GasPriceServiceV1, DA, AtomicStorage>; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -271,26 +240,23 @@ where } fn sync_gas_price_db_with_on_chain_storage< - 'a, L2DataStore, L2DataStoreView, SettingsProvider, - PersistedData, + AtomicStorage, >( settings: &SettingsProvider, config: &V1AlgorithmConfig, on_chain_db: &L2DataStoreView, metadata_height: u32, latest_block_height: u32, - persisted_data: &'a mut PersistedData, + persisted_data: &mut AtomicStorage, ) -> anyhow::Result<()> where L2DataStore: L2Data, L2DataStoreView: AtomicView, SettingsProvider: GasPriceSettingsProvider, - PersistedData: GetMetadataStorage + TransactionableStorage + 'a, - ::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks, + AtomicStorage: GasPriceServiceAtomicStorage, { let metadata = persisted_data .get_metadata(&metadata_height.into())? @@ -318,31 +284,23 @@ where Ok(()) } -fn sync_v1_metadata< - 'a, - L2DataStore, - L2DataStoreView, - SettingsProvider, - StorageTxGenerator, ->( +fn sync_v1_metadata( settings: &SettingsProvider, on_chain_db: &L2DataStoreView, metadata_height: u32, latest_block_height: u32, updater: &mut AlgorithmUpdaterV1, - storage_tx_generator: &'a mut StorageTxGenerator, + da_storage: &mut AtomicStorage, ) -> anyhow::Result<()> where L2DataStore: L2Data, L2DataStoreView: AtomicView, SettingsProvider: GasPriceSettingsProvider, - StorageTxGenerator: TransactionableStorage + 'a, - ::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks, + AtomicStorage: GasPriceServiceAtomicStorage, { let first = metadata_height.saturating_add(1); let view = on_chain_db.latest_view()?; - let mut tx = storage_tx_generator.begin_transaction()?; + let mut tx = da_storage.begin_transaction()?; for height in first..=latest_block_height { let block = view .get_block(&height.into())? @@ -371,65 +329,45 @@ where block_gas_capacity, block_bytes, fee_wei.into(), - &mut tx, + &mut tx.as_unrecorded_blocks(), )?; let metadata: UpdaterMetadata = updater.clone().into(); tx.set_metadata(&metadata)?; } - StorageTxGenerator::commit_transaction(tx)?; + AtomicStorage::commit_transaction(tx)?; Ok(()) } #[allow(clippy::type_complexity)] -#[allow(clippy::too_many_arguments)] -pub fn new_gas_price_service_v1< - L2DataStore, - GasPriceStore, - Metadata, - DA, - SettingsProvider, - PersistedData, ->( +pub fn new_gas_price_service_v1( v1_config: V1AlgorithmConfig, genesis_block_height: BlockHeight, settings: SettingsProvider, block_stream: BoxStream, - gas_price_db: GasPriceStore, + gas_price_db: AtomicStorage, da_source: DA, on_chain_db: L2DataStore, - persisted_data: PersistedData, ) -> anyhow::Result< - ServiceRunner< - UninitializedTask< - L2DataStore, - GasPriceStore, - DA, - SettingsProvider, - PersistedData, - >, - >, + ServiceRunner>, > where L2DataStore: AtomicView, L2DataStore::LatestView: L2Data, - GasPriceStore: GasPriceData, + AtomicStorage: GasPriceServiceAtomicStorage + GasPriceData, SettingsProvider: GasPriceSettingsProvider, DA: DaBlockCostsSource, - PersistedData: - GetMetadataStorage + GetDaSequenceNumber + TransactionableStorage + 'static, - for<'a> PersistedData::Transaction<'a>: - SetMetadataStorage + UnrecordedBlocks + SetDaSequenceNumber, { + let metadata_height = gas_price_db.latest_height(); let gas_price_init = UninitializedTask::new( v1_config, + metadata_height, genesis_block_height, settings, block_stream, gas_price_db, da_source, on_chain_db, - persisted_data, )?; Ok(ServiceRunner::new(gas_price_init)) } diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs b/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs index 6e3985476ed..780479c2e04 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task/fuel_storage_unrecorded_blocks.rs @@ -11,8 +11,10 @@ use fuel_core_storage::{ Modifiable, WriteTransaction, }, + Error as StorageError, StorageAsMut, StorageAsRef, + StorageMutate, }; use fuel_core_types::fuel_merkle::storage::StorageMutateInfallible; use fuel_gas_price_algorithm::{ @@ -20,6 +22,27 @@ use fuel_gas_price_algorithm::{ v1::UnrecordedBlocks, }; +pub trait AsUnrecordedBlocks { + type Wrapper<'a>: UnrecordedBlocks + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_>; +} + +impl AsUnrecordedBlocks for S +where + S: StorageMutate, +{ + type Wrapper<'a> = FuelStorageUnrecordedBlocks<&'a mut Self> + where + Self: 'a; + + fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_> { + FuelStorageUnrecordedBlocks::new(self) + } +} + #[derive(Debug, Clone)] pub struct FuelStorageUnrecordedBlocks { inner: Storage, @@ -31,33 +54,24 @@ impl FuelStorageUnrecordedBlocks { } } -impl UnrecordedBlocks for FuelStorageUnrecordedBlocks +impl UnrecordedBlocks for FuelStorageUnrecordedBlocks where - Storage: KeyValueInspect + Modifiable, - Storage: Send + Sync, + S: StorageMutate, { - fn insert(&mut self, height: v1::Height, bytes: v1::Bytes) -> Result<(), v1::Error> { - let mut tx = self.inner.write_transaction(); - tx.storage_as_mut::() + fn insert(&mut self, height: v1::Height, bytes: v1::Bytes) -> Result<(), String> { + self.inner + .storage_as_mut::() .insert(&height, &bytes) - .and_then(|_| tx.commit()) - .map_err(|err| { - v1::Error::CouldNotInsertUnrecordedBlock(format!("Error: {:?}", err)) - })?; + .map_err(|err| format!("Error: {:?}", err))?; Ok(()) } - fn remove(&mut self, height: &v1::Height) -> Result, v1::Error> { - let mut tx = self.inner.write_transaction(); - let bytes = tx + fn remove(&mut self, height: &v1::Height) -> Result, String> { + let bytes = self + .inner .storage_as_mut::() .take(height) - .map_err(|err| { - v1::Error::CouldNotRemoveUnrecordedBlock(format!("Error: {:?}", err)) - })?; - tx.commit().map_err(|err| { - v1::Error::CouldNotRemoveUnrecordedBlock(format!("Error: {:?}", err)) - })?; + .map_err(|err| format!("Error: {:?}", err))?; Ok(bytes) } }