Skip to content

Commit

Permalink
optimize inclusion_poller fn
Browse files Browse the repository at this point in the history
  • Loading branch information
juan518munoz committed Nov 15, 2024
1 parent ae0b69f commit b11df04
Showing 1 changed file with 45 additions and 71 deletions.
116 changes: 45 additions & 71 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ use std::{collections::HashSet, future::Future, sync::Arc, time::Duration};

use anyhow::Context;
use chrono::Utc;
use futures::future::join_all;
use rand::Rng;
use tokio::{
sync::{
mpsc,
watch::{self, Receiver},
Mutex, Notify,
},
Expand Down Expand Up @@ -115,13 +113,14 @@ impl DataAvailabilityDispatcher {
let next_expected_batch = next_expected_batch.clone();
let pool = self.pool.clone();
dispatcher_tasks.spawn(async move {
let _permit = request_semaphore.clone().acquire_owned().await?;
let permit = request_semaphore.clone().acquire_owned().await?;
let dispatch_latency = METRICS.blob_dispatch_latency.start();

let result = retry(config.max_retries(), batch.l1_batch_number, || {
client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone())
})
.await;
drop(permit);
if result.is_err() {
shutdown_tx.clone().send(true)?;
notifier.notify_waiters();
Expand Down Expand Up @@ -212,64 +211,34 @@ impl DataAvailabilityDispatcher {
}

async fn inclusion_poller(&self, stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel(
self.config
.max_concurrent_requests
.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize,
);
let mut pending_inclusions = HashSet::new();
let mut inclusion_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();

let stop_receiver_clone = stop_receiver.clone();
let pool_clone = self.pool.clone();
let pending_inclusion_reader = tokio::spawn(async move {
let mut pending_inclusions = HashSet::new();
loop {
if *stop_receiver_clone.borrow() {
break;
}
loop {
if *stop_receiver.borrow() {
break;
}

let mut conn = pool_clone.connection_tagged("da_dispatcher").await?;
let pending_blobs = conn
.data_availability_dal()
.get_da_blob_ids_awaiting_inclusion()
.await?;
drop(conn);
let mut conn = self.pool.connection_tagged("da_dispatcher").await?;
let pending_blobs = conn
.data_availability_dal()
.get_da_blob_ids_awaiting_inclusion()
.await?;
drop(conn);

for blob_info in pending_blobs.into_iter().flatten() {
if pending_inclusions.contains(&blob_info.blob_id) {
continue;
}
pending_inclusions.insert(blob_info.blob_id.clone());
tx.send(blob_info).await?;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
Ok::<(), anyhow::Error>(())
});

let pool = self.pool.clone();
let config = self.config.clone();
let client = self.client.clone();
let semaphore = self.request_semaphore.clone();
let pending_inclusion_sender = tokio::spawn(async move {
let mut spawned_requests = vec![];
loop {
if *stop_receiver.borrow() {
break;
for blob_info in pending_blobs.into_iter().flatten() {
if pending_inclusions.contains(&blob_info.blob_id) {
continue;
}
let blob_info = match rx.recv().await {
Some(blob_info) => blob_info,
None => continue, // Should never happen
};

// Block until we can send the request
let permit = semaphore.clone().acquire_owned().await?;

let client = client.clone();
let pool = pool.clone();
let config = config.clone();
let request = tokio::spawn(async move {
let _permit = permit; // move permit into scope
pending_inclusions.insert(blob_info.blob_id.clone());

let client = self.client.clone();
let config = self.config.clone();
let pool = self.pool.clone();
let request_semaphore = self.request_semaphore.clone();
inclusion_tasks.spawn(async move {
let inclusion_data = if config.use_dummy_inclusion_data() {
let _permit = request_semaphore.acquire_owned().await?;
client
.get_inclusion_data(blob_info.blob_id.as_str())
.await
Expand All @@ -279,11 +248,11 @@ impl DataAvailabilityDispatcher {
blob_info.blob_id, blob_info.l1_batch_number
)
})?
} else {
// if the inclusion verification is disabled, we don't need to wait for the inclusion
// data before committing the batch, so simply return an empty vector
Some(InclusionData { data: vec![] })
};
} else {
// if the inclusion verification is disabled, we don't need to wait for the inclusion
// data before committing the batch, so simply return an empty vector
Some(InclusionData { data: vec![] })
};

let Some(inclusion_data) = inclusion_data else {
return Ok(());
Expand All @@ -295,7 +264,7 @@ impl DataAvailabilityDispatcher {
L1BatchNumber(blob_info.l1_batch_number.0),
inclusion_data.data.as_slice(),
)
.await?;
.await?;
drop(conn);

let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at);
Expand All @@ -312,19 +281,24 @@ impl DataAvailabilityDispatcher {
blob_info.l1_batch_number,
inclusion_latency.num_seconds()
);

Ok::<(), anyhow::Error>(())
Ok(())
});
spawned_requests.push(request);
}
join_all(spawned_requests).await;
Ok::<(), anyhow::Error>(())
});

let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await;
for result in results {
result??;
// Sleep so we prevent hammering the database
tokio::time::sleep(Duration::from_secs(5)).await;
}

while let Some(next) = inclusion_tasks.join_next().await {
match next {
Ok(_) => (),
Err(e) => {
inclusion_tasks.shutdown().await;
return Err(e.into());
}
}
}

Ok(())
}
}
Expand Down

0 comments on commit b11df04

Please sign in to comment.