Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mev-builder-rs): Parallelize Proposer Schedule Relay Querying #203

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions mev-build-rs/src/reth_builder/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use ethereum_consensus::{
state_transition::Context,
};
use ethers::signers::{LocalWallet, Signer};
use mev_rs::{blinded_block_relayer::BlindedBlockRelayer, compute_preferred_gas_limit, Relay};
use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
use mev_rs::{
blinded_block_relayer::BlindedBlockRelayer, compute_preferred_gas_limit,
types::ProposerSchedule, Relay,
};
use reth_payload_builder::PayloadBuilderAttributes;
use reth_primitives::{Address, BlockNumberOrTag, Bytes, ChainSpec, B256, U256};
use reth_provider::{BlockReaderIdExt, BlockSource, StateProviderFactory};
Expand Down Expand Up @@ -124,19 +128,48 @@ impl<Pool, Client> Builder<Pool, Client> {
}

async fn on_epoch(&self, epoch: Epoch) {
// TODO: concurrent fetch
// TODO: batch updates to auction schedule
for relay in self.relays.iter() {
match relay.get_proposal_schedule().await {
Ok(schedule) => {
let slots = self.auction_schedule.process(relay.clone(), &schedule);
tracing::info!(epoch, ?slots, %relay, "processed proposer schedule");
}
Err(err) => {
tracing::warn!(err = %err, "error fetching proposer schedule from relay")
}
}
// Multiple threads query relays and forward the schedule results
// to the receiver stream. The channel is bounded to 16, and will
// apply backpressure when needed.
let (tx, rx) = mpsc::channel::<(usize, Vec<ProposerSchedule>)>(16);
let mut rx = ReceiverStream::new(rx);

// Spawn tasks to fetch ProposerSchedule lists.
let tasks = self
.relays
.iter()
.enumerate()
.map(|(i, relay)| {
let relay = relay.clone();
let tx = tx.clone();
tokio::spawn(async move {
let schedule = relay.get_proposal_schedule();
let schedule = match schedule.await {
Ok(schedule) => schedule,
Err(e) => {
tracing::error!(%e, "error fetching proposal schedule");
return;
}
};
// Send the schedules to the processing task.
if let Err(e) = tx.send((i, schedule)).await {
tracing::error!(%e, "error sending proposal schedule for processing");
}
})
})
.collect::<FuturesUnordered<_>>();

// Stream the schedules for processing via the auction schedule.
while let Some((i, schedule)) = rx.next().await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tx needs to be dropped after creating tasks and before processing the rx stream, otherwise it will never close since not all senders will be dropped.

let relay = self.relays[i].clone();
self.auction_schedule.process(relay.clone(), &schedule);
tracing::info!(epoch, %relay, "processed proposer schedule");
}

// Sanity check to ensure all threads finish querying relays.
join_all(tasks).await;

// Clear the auction schedule for the next epoch.
let slot = epoch * self.context.slots_per_epoch;
self.auction_schedule.clear(slot);
}
Expand Down