Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-m0-implementation): concurrent da dispatcher #333

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
pub const DEFAULT_MAX_RETRIES: u16 = 5;
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;
pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100;
juanbono marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct DADispatcherConfig {
Expand All @@ -19,6 +20,8 @@ pub struct DADispatcherConfig {
// TODO: run a verification task to check if the L1 contract expects the inclusion proofs to
// avoid the scenario where contracts expect real proofs, and server is using dummy proofs.
pub use_dummy_inclusion_data: Option<bool>,
/// The maximun number of concurrent request to send to the DA server.
pub max_concurrent_requests: Option<u32>,
}

impl DADispatcherConfig {
Expand All @@ -28,6 +31,7 @@ impl DADispatcherConfig {
max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH),
max_retries: Some(DEFAULT_MAX_RETRIES),
use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA),
max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS),
}
}

Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ impl Distribution<configs::da_dispatcher::DADispatcherConfig> for EncodeDist {
max_rows_to_dispatch: self.sample(rng),
max_retries: self.sample(rng),
use_dummy_inclusion_data: self.sample(rng),
max_concurrent_requests: self.sample(rng),
}
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/lib/dal/src/data_availability_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,45 @@ impl DataAvailabilityDal<'_, '_> {
.map(DataAvailabilityBlob::from))
}

pub async fn get_da_blob_ids_awaiting_inclusion(
&mut self,
) -> DalResult<Vec<Option<DataAvailabilityBlob>>> {
let rows = sqlx::query!(
r#"
SELECT
l1_batch_number,
blob_id,
inclusion_data,
sent_at
FROM
data_availability
WHERE
inclusion_data IS NULL
ORDER BY
l1_batch_number
"#,
)
.instrument("get_da_blobs_awaiting_inclusion")
.fetch_all(self.storage)
.await?;

Ok(rows
.into_iter()
.map(|row| {
let l1_batch_number_u32 = row.l1_batch_number.try_into();
if let Ok(l1_batch_number) = l1_batch_number_u32 {
Some(DataAvailabilityBlob {
l1_batch_number: L1BatchNumber(l1_batch_number),
blob_id: row.blob_id,
inclusion_data: row.inclusion_data,
sent_at: row.sent_at.and_utc(),
})
} else {
None
}
})
.collect())
}
/// Fetches the pubdata and `l1_batch_number` for the L1 batches that are ready for DA dispatch.
pub async fn get_ready_for_da_dispatch_l1_batches(
&mut self,
Expand Down
5 changes: 4 additions & 1 deletion core/lib/env_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ mod tests {
interval: u32,
rows_limit: u32,
max_retries: u16,
max_concurrent_requests: u32,
) -> DADispatcherConfig {
DADispatcherConfig {
polling_interval_ms: Some(interval),
max_rows_to_dispatch: Some(rows_limit),
max_retries: Some(max_retries),
use_dummy_inclusion_data: Some(true),
max_concurrent_requests: Some(max_concurrent_requests),
}
}

Expand All @@ -38,9 +40,10 @@ mod tests {
DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60
DA_DISPATCHER_MAX_RETRIES=7
DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true"
DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10
"#;
lock.set_env(config);
let actual = DADispatcherConfig::from_env().unwrap();
assert_eq!(actual, expected_da_layer_config(5000, 60, 7));
assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10));
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: self.max_rows_to_dispatch,
max_retries: self.max_retries.map(|x| x as u16),
use_dummy_inclusion_data: self.use_dummy_inclusion_data,
max_concurrent_requests: self.max_concurrent_requests,
})
}

Expand All @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: this.max_rows_to_dispatch,
max_retries: this.max_retries.map(Into::into),
use_dummy_inclusion_data: this.use_dummy_inclusion_data,
max_concurrent_requests: this.max_concurrent_requests,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher {
optional uint32 max_rows_to_dispatch = 2;
optional uint32 max_retries = 3;
optional bool use_dummy_inclusion_data = 4;
optional uint32 max_concurrent_requests = 5;
}
Loading
Loading