Skip to content

Commit

Permalink
chore(async_trait): reduce usage of async_trait for small types again
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Jan 26, 2025
1 parent 75d6c5a commit a9d4adc
Show file tree
Hide file tree
Showing 14 changed files with 51 additions and 52 deletions.
3 changes: 1 addition & 2 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,9 @@ pub trait P2pPort: Send + Sync {
}

/// Trait for defining how to estimate gas price for future blocks
#[async_trait::async_trait]
pub trait GasPriceEstimate: Send + Sync {
/// The worst case scenario for gas price at a given horizon
async fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64>;
fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64>;
}

/// Trait for getting VM memory.
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl ShutdownListener {
}
}

#[async_trait::async_trait]
impl NotifyCancel for ShutdownListener {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.token.cancelled().await;
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/schema/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl EstimateGasPriceQuery {
let gas_price_provider = ctx.data_unchecked::<GasPriceProvider>();
let gas_price = gas_price_provider
.worst_case_gas_price(target_block.into())
.await
.ok_or(async_graphql::Error::new(format!(
"Failed to estimate gas price for block, algorithm not yet set: {target_block:?}"
)))?;
Expand Down
19 changes: 6 additions & 13 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod universal_gas_price_provider_tests {
use super::*;
use proptest::proptest;

async fn _worst_case__correctly_calculates_value(
fn _worst_case__correctly_calculates_value(
gas_price: u64,
starting_height: u32,
block_horizon: u32,
Expand All @@ -112,10 +112,7 @@ mod universal_gas_price_provider_tests {

// when
let target_height = starting_height.saturating_add(block_horizon);
let estimated = subject
.worst_case_gas_price(target_height.into())
.await
.unwrap();
let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();

// then
let mut actual = gas_price;
Expand All @@ -137,13 +134,12 @@ mod universal_gas_price_provider_tests {
block_horizon in 0..10_000u32,
percentage: u16,
) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(_worst_case__correctly_calculates_value(
_worst_case__correctly_calculates_value(
gas_price,
starting_height,
block_horizon,
percentage,
));
);
}
}

Expand All @@ -155,15 +151,13 @@ mod universal_gas_price_provider_tests {
block_horizon in 0..10_000u32,
percentage: u16
) {
let rt = tokio::runtime::Runtime::new().unwrap();

// given
let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);

// when
let target_height = starting_height.saturating_add(block_horizon);

let _ = rt.block_on(subject.worst_case_gas_price(target_height.into()));
let _ = subject.worst_case_gas_price(target_height.into());

// then
// doesn't panic with an overflow
Expand Down Expand Up @@ -273,9 +267,8 @@ impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
}
}

#[async_trait::async_trait]
impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
async fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
let (best_height, best_gas_price) = self.get_height_and_gas_price();
let percentage = self.percentage;

Expand Down
3 changes: 1 addition & 2 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ impl worker::TxPool for TxPoolAdapter {
}
}

#[async_trait::async_trait]
impl GasPriceEstimate for StaticGasPrice {
async fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
Some(self.gas_price)
}
}
Expand Down
2 changes: 0 additions & 2 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ impl PeerReport for P2PAdapterPeerReport {
}
}

#[async_trait::async_trait]
impl BlockImporterPort for BlockImporterAdapter {
fn committed_height_stream(&self) -> BoxStream<BlockHeight> {
use futures::StreamExt;
Expand All @@ -160,7 +159,6 @@ impl BlockImporterPort for BlockImporterAdapter {
}
}

#[async_trait::async_trait]
impl ConsensusPort for ConsensusAdapter {
fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result<bool> {
Ok(self.block_verifier.verify_consensus(header))
Expand Down
7 changes: 3 additions & 4 deletions crates/fuel-core/src/service/genesis/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pub struct TaskManager<T> {
cancel_token: CancellationToken,
}

#[async_trait::async_trait]
pub trait NotifyCancel {
async fn wait_until_cancelled(&self) -> anyhow::Result<()>;
fn wait_until_cancelled(
&self,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
fn is_cancelled(&self) -> bool;
}

#[async_trait::async_trait]
impl NotifyCancel for tokio_util::sync::CancellationToken {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.cancelled().await;
Expand All @@ -30,7 +30,6 @@ impl NotifyCancel for tokio_util::sync::CancellationToken {
}
}

#[async_trait::async_trait]
impl NotifyCancel for StateWatcher {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
let mut state = self.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ where
Self(Arc::new(parking_lot::RwLock::new(algorithm)))
}

pub async fn update(&mut self, new_algo: A) {
pub fn update(&mut self, new_algo: A) {
let mut write_lock = self.0.write();
*write_lock = new_algo;
}
Expand All @@ -37,7 +37,7 @@ where
self.0.read().next_gas_price()
}

pub async fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64 {
pub fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64 {
self.0.read().worst_case_gas_price(block_height)
}
}
6 changes: 3 additions & 3 deletions crates/services/gas_price_service/src/v0/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ where
self.shared_algo.clone()
}

async fn update(&mut self, new_algorithm: AlgorithmV0) {
self.shared_algo.update(new_algorithm).await;
fn update(&mut self, new_algorithm: AlgorithmV0) {
self.shared_algo.update(new_algorithm);
}

fn validate_block_gas_capacity(
Expand Down Expand Up @@ -115,7 +115,7 @@ where
}
}

self.update(self.algorithm_updater.algorithm()).await;
self.update(self.algorithm_updater.algorithm());
Ok(())
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ where
&self.storage_tx_provider
}

async fn update(&mut self, new_algorithm: AlgorithmV1) {
self.shared_algo.update(new_algorithm).await;
fn update(&mut self, new_algorithm: AlgorithmV1) {
self.shared_algo.update(new_algorithm);
}

fn validate_block_gas_capacity(
Expand Down Expand Up @@ -304,7 +304,7 @@ where
AtomicStorage::commit_transaction(storage_tx)?;
let new_algo = self.algorithm_updater.algorithm();
tracing::debug!("Updating gas price: {}", &new_algo.calculate());
self.shared_algo.update(new_algo).await;
self.shared_algo.update(new_algo);
// Clear the buffer after committing changes
self.da_block_costs_buffer.clear();
Ok(())
Expand All @@ -321,7 +321,7 @@ where
tx.set_metadata(&metadata).map_err(|err| anyhow!(err))?;
AtomicStorage::commit_transaction(tx)?;
let new_algo = self.algorithm_updater.algorithm();
self.shared_algo.update(new_algo).await;
self.shared_algo.update(new_algo);
}
BlockInfo::Block {
height,
Expand Down
35 changes: 23 additions & 12 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,35 @@ pub struct EmptyShared;

/// Trait for service runners, providing a minimal interface for managing
/// the lifecycle of services such as start/stop and health status.
#[async_trait::async_trait]
pub trait Service {
/// Send a start signal to the service without waiting for it to start.
/// Returns an error if the service was already started.
fn start(&self) -> anyhow::Result<()>;

/// Send a start signal to the service and wait for it to start up.
/// Returns an error if the service was already started.
async fn start_and_await(&self) -> anyhow::Result<State>;
fn start_and_await(
&self,
) -> impl core::future::Future<Output = anyhow::Result<State>> + Send;

/// Wait for service to start or stop (without sending any signal).
async fn await_start_or_stop(&self) -> anyhow::Result<State>;
fn await_start_or_stop(
&self,
) -> impl core::future::Future<Output = anyhow::Result<State>> + Send;

/// Send a stop signal to the service without waiting for it to shutdown.
/// Returns false if the service was already stopped, true if it is running.
fn stop(&self) -> bool;

/// Send stop signal to service and wait for it to shutdown.
async fn stop_and_await(&self) -> anyhow::Result<State>;
fn stop_and_await(
&self,
) -> impl core::future::Future<Output = anyhow::Result<State>> + Send;

/// Wait for service to stop (without sending a stop signal).
async fn await_stop(&self) -> anyhow::Result<State>;
fn await_stop(
&self,
) -> impl core::future::Future<Output = anyhow::Result<State>> + Send;

/// The current state of the service (i.e. `Started`, `Stopped`, etc..)
fn state(&self) -> State;
Expand Down Expand Up @@ -145,16 +152,17 @@ pub trait RunnableTask: Send {
#[derive(Debug)]
pub struct ServiceRunner<S>
where
S: RunnableService + 'static,
S: RunnableService + 'static + Sync,
{
/// The shared state of the service
pub shared: S::SharedData,
state: Shared<watch::Sender<State>>,
_marker: core::marker::PhantomData<S>,
}

impl<S> Drop for ServiceRunner<S>
where
S: RunnableService + 'static,
S: RunnableService + 'static + Sync,
{
fn drop(&mut self) {
self.stop();
Expand All @@ -163,7 +171,7 @@ where

impl<S> ServiceRunner<S>
where
S: RunnableService + 'static,
S: RunnableService + 'static + Sync,
S::TaskParams: Default,
{
/// Initializes a new `ServiceRunner` containing a `RunnableService`
Expand All @@ -174,14 +182,18 @@ where

impl<S> ServiceRunner<S>
where
S: RunnableService + 'static,
S: RunnableService + 'static + Sync,
{
/// Initializes a new `ServiceRunner` containing a `RunnableService` with parameters for underlying `Task`
pub fn new_with_params(service: S, params: S::TaskParams) -> Self {
let shared = service.shared_data();
let metric = FuturesMetrics::obtain_futures_metrics(S::NAME);
let state = initialize_loop(service, params, metric);
Self { shared, state }
Self {
shared,
state,
_marker: core::marker::PhantomData::<S>::default(),
}
}

async fn _await_start_or_stop(
Expand All @@ -208,10 +220,9 @@ where
}
}

#[async_trait::async_trait]
impl<S> Service for ServiceRunner<S>
where
S: RunnableService + 'static,
S: RunnableService + 'static + Sync,
{
fn start(&self) -> anyhow::Result<()> {
let started = self.state.send_if_modified(|state| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::time::Duration;

pub struct PressureBlockImporter(MockBlockImporterPort, Duration, SharedCounts);

#[async_trait::async_trait]
impl BlockImporterPort for PressureBlockImporter {
fn committed_height_stream(&self) -> BoxStream<BlockHeight> {
self.0.committed_height_stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::Duration;

pub struct PressureConsensus(MockConsensusPort, Duration, SharedCounts);

#[async_trait::async_trait]
impl ConsensusPort for PressureConsensus {
fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result<bool> {
self.0.check_sealed_header(header)
Expand Down
12 changes: 8 additions & 4 deletions crates/services/sync/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,27 @@ pub trait PeerToPeerPort {
}

#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)]
#[async_trait::async_trait]
/// Port for communication with the consensus service.
pub trait ConsensusPort {
/// Check if the given sealed block header is valid.
fn check_sealed_header(&self, header: &SealedBlockHeader) -> anyhow::Result<bool>;
/// await for this DA height to be sync'd.
async fn await_da_height(&self, da_height: &DaBlockHeight) -> anyhow::Result<()>;
fn await_da_height(
&self,
da_height: &DaBlockHeight,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
}

#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)]
#[async_trait::async_trait]
/// Port for communication with the block importer.
pub trait BlockImporterPort {
/// Stream of newly committed block heights.
fn committed_height_stream(&self) -> BoxStream<BlockHeight>;

/// Execute the given sealed block
/// and commit it to the database.
async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()>;
fn execute_and_commit(
&self,
block: SealedBlock,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
}

0 comments on commit a9d4adc

Please sign in to comment.