diff --git a/mev-build-rs/src/reth_builder/builder.rs b/mev-build-rs/src/reth_builder/builder.rs index 5b78291f..e8ff764f 100644 --- a/mev-build-rs/src/reth_builder/builder.rs +++ b/mev-build-rs/src/reth_builder/builder.rs @@ -9,7 +9,11 @@ use ethereum_consensus::{ state_transition::Context, }; use ethers::signers::{LocalWallet, Signer}; -use mev_rs::{blinded_block_relayer::BlindedBlockRelayer, compute_preferred_gas_limit, Relay}; +use futures::{future::join_all, stream::FuturesUnordered, StreamExt}; +use mev_rs::{ + blinded_block_relayer::BlindedBlockRelayer, compute_preferred_gas_limit, + types::ProposerSchedule, Relay, +}; use reth_payload_builder::PayloadBuilderAttributes; use reth_primitives::{Address, BlockNumberOrTag, Bytes, ChainSpec, B256, U256}; use reth_provider::{BlockReaderIdExt, BlockSource, StateProviderFactory}; @@ -124,19 +128,48 @@ impl Builder { } async fn on_epoch(&self, epoch: Epoch) { - // TODO: concurrent fetch - // TODO: batch updates to auction schedule - for relay in self.relays.iter() { - match relay.get_proposal_schedule().await { - Ok(schedule) => { - let slots = self.auction_schedule.process(relay.clone(), &schedule); - tracing::info!(epoch, ?slots, %relay, "processed proposer schedule"); - } - Err(err) => { - tracing::warn!(err = %err, "error fetching proposer schedule from relay") - } - } + // Multiple threads query relays and forward the schedule results + // to the receiver stream. The channel is bounded to 16, and will + // apply backpressure when needed. + let (tx, rx) = mpsc::channel::<(usize, Vec)>(16); + let mut rx = ReceiverStream::new(rx); + + // Spawn tasks to fetch ProposerSchedule lists. + let tasks = self + .relays + .iter() + .enumerate() + .map(|(i, relay)| { + let relay = relay.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let schedule = relay.get_proposal_schedule(); + let schedule = match schedule.await { + Ok(schedule) => schedule, + Err(e) => { + tracing::error!(%e, "error fetching proposal schedule"); + return; + } + }; + // Send the schedules to the processing task. + if let Err(e) = tx.send((i, schedule)).await { + tracing::error!(%e, "error sending proposal schedule for processing"); + } + }) + }) + .collect::>(); + + // Stream the schedules for processing via the auction schedule. + while let Some((i, schedule)) = rx.next().await { + let relay = self.relays[i].clone(); + self.auction_schedule.process(relay.clone(), &schedule); + tracing::info!(epoch, %relay, "processed proposer schedule"); } + + // Sanity check to ensure all threads finish querying relays. + join_all(tasks).await; + + // Clear the auction schedule for the next epoch. let slot = epoch * self.context.slots_per_epoch; self.auction_schedule.clear(slot); }