Skip to content

Commit

Permalink
feat: Pipeline Builder (#217)
Browse files Browse the repository at this point in the history
* fix: pipeline builder

* feat: pipeline builder

* fix: use online provider impls

* fix: feature flag builder example
  • Loading branch information
refcell authored Jun 6, 2024
1 parent 3b594c8 commit a606d08
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 68 deletions.
71 changes: 26 additions & 45 deletions crates/derive/src/online/blob_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ use crate::{
types::{APIBlobSidecar, Blob, BlobProviderError, BlobSidecar, BlockInfo, IndexedBlobHash},
};
use alloc::{boxed::Box, vec::Vec};
use alloy_provider::Provider;
use alloy_transport_http::Http;
use async_trait::async_trait;
use core::marker::PhantomData;
use reqwest::Client;
use tracing::debug;

/// Specifies the derivation of a slot from a timestamp.
Expand All @@ -21,9 +18,7 @@ pub trait SlotDerivation {

/// An online implementation of the [BlobProvider] trait.
#[derive(Debug, Clone)]
pub struct OnlineBlobProvider<T: Provider<Http<Client>>, B: BeaconClient, S: SlotDerivation> {
/// The inner Ethereum JSON-RPC provider.
_inner: T,
pub struct OnlineBlobProvider<B: BeaconClient, S: SlotDerivation> {
/// Whether to fetch all sidecars.
fetch_all_sidecars: bool,
/// The Beacon API client.
Expand All @@ -36,21 +31,19 @@ pub struct OnlineBlobProvider<T: Provider<Http<Client>>, B: BeaconClient, S: Slo
_slot_derivation: PhantomData<S>,
}

impl<T: Provider<Http<Client>>, B: BeaconClient, S: SlotDerivation> OnlineBlobProvider<T, B, S> {
impl<B: BeaconClient, S: SlotDerivation> OnlineBlobProvider<B, S> {
/// Creates a new instance of the [OnlineBlobProvider].
///
/// The `genesis_time` and `slot_interval` arguments are _optional_ and the
/// [OnlineBlobProvider] will attempt to load them dynamically at runtime if they are not
/// provided.
pub fn new(
_inner: T,
fetch_all_sidecars: bool,
beacon_client: B,
genesis_time: Option<u64>,
slot_interval: Option<u64>,
) -> Self {
Self {
_inner,
fetch_all_sidecars,
beacon_client,
genesis_time,
Expand Down Expand Up @@ -105,9 +98,8 @@ impl SlotDerivation for SimpleSlotDerivation {
}

#[async_trait]
impl<T, B, S> BlobProvider for OnlineBlobProvider<T, B, S>
impl<B, S> BlobProvider for OnlineBlobProvider<B, S>
where
T: Provider<Http<Client>> + Send,
B: BeaconClient + Send + Sync,
S: SlotDerivation + Send + Sync,
{
Expand Down Expand Up @@ -172,24 +164,23 @@ where
mod tests {
use super::*;
use crate::{
online::test_utils::{spawn_anvil, MockBeaconClient},
online::test_utils::MockBeaconClient,
types::{APIConfigResponse, APIGenesisResponse, APIGetBlobSidecarsResponse},
};
use alloc::vec;
use alloy_primitives::b256;

#[tokio::test]
async fn test_load_config_succeeds() {
let (provider, _anvil) = spawn_anvil();
let genesis_time = 10;
let seconds_per_slot = 12;
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(genesis_time)),
config_spec: Some(APIConfigResponse::new(seconds_per_slot)),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let result = blob_provider.load_configs().await;
assert!(result.is_ok());
assert_eq!(blob_provider.genesis_time, Some(genesis_time));
Expand All @@ -198,7 +189,6 @@ mod tests {

#[tokio::test]
async fn test_get_blobs() {
let (provider, _anvil) = spawn_anvil();
let json_bytes = include_bytes!("testdata/eth_v1_beacon_sidecars_goerli.json");
let sidecars: APIGetBlobSidecarsResponse = serde_json::from_slice(json_bytes).unwrap();
let blob_hashes = vec![
Expand Down Expand Up @@ -229,19 +219,18 @@ mod tests {
blob_sidecars: Some(sidecars),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let blobs = blob_provider.get_blobs(&block_ref, &blob_hashes).await.unwrap();
assert_eq!(blobs.len(), 5);
}

#[tokio::test]
async fn test_get_blobs_empty_hashes() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient::default();
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo::default();
let blob_hashes = Vec::new();
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -250,10 +239,9 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_beacon_genesis_fetch_fails() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient::default();
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo::default();
let blob_hashes = vec![IndexedBlobHash::default()];
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -265,13 +253,12 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_config_spec_fetch_fails() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::default()),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo::default();
let blob_hashes = vec![IndexedBlobHash::default()];
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -283,14 +270,13 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_before_genesis_fails() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(10)),
config_spec: Some(APIConfigResponse::new(12)),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 5, ..Default::default() };
let blob_hashes = vec![IndexedBlobHash::default()];
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -304,14 +290,13 @@ mod tests {

#[tokio::test]
async fn test_get_blob_sidecars_fetch_fails() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(10)),
config_spec: Some(APIConfigResponse::new(12)),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let blob_hashes = vec![IndexedBlobHash::default()];
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -323,7 +308,6 @@ mod tests {

#[tokio::test]
async fn test_get_blob_sidecars_length_mismatch() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(10)),
config_spec: Some(APIConfigResponse::new(12)),
Expand All @@ -332,8 +316,8 @@ mod tests {
}),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let blob_hashes = vec![IndexedBlobHash { index: 1, ..Default::default() }];
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
Expand All @@ -342,7 +326,6 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_invalid_ordering() {
let (provider, _anvil) = spawn_anvil();
let json_bytes = include_bytes!("testdata/eth_v1_beacon_sidecars_goerli.json");
let sidecars: APIGetBlobSidecarsResponse = serde_json::from_slice(json_bytes).unwrap();
let beacon_client = MockBeaconClient {
Expand Down Expand Up @@ -373,8 +356,8 @@ mod tests {
hash: b256!("01df1f9ae707f5847513c9c430b683182079edf2b1f94ee12e4daae7f3c8c309"),
},
];
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let result = blob_provider.get_blobs(&block_ref, &blob_hashes).await;
assert_eq!(
Expand All @@ -387,7 +370,6 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_invalid_hash() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(10)),
config_spec: Some(APIConfigResponse::new(12)),
Expand All @@ -396,8 +378,8 @@ mod tests {
}),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let blob_hashes = vec![IndexedBlobHash {
hash: alloy_primitives::FixedBytes::from([1; 32]),
Expand All @@ -409,7 +391,6 @@ mod tests {

#[tokio::test]
async fn test_get_blobs_failed_verification() {
let (provider, _anvil) = spawn_anvil();
let beacon_client = MockBeaconClient {
beacon_genesis: Some(APIGenesisResponse::new(10)),
config_spec: Some(APIConfigResponse::new(12)),
Expand All @@ -418,8 +399,8 @@ mod tests {
}),
..Default::default()
};
let mut blob_provider: OnlineBlobProvider<_, _, SimpleSlotDerivation> =
OnlineBlobProvider::new(provider, true, beacon_client, None, None);
let mut blob_provider: OnlineBlobProvider<_, SimpleSlotDerivation> =
OnlineBlobProvider::new(true, beacon_client, None, None);
let block_ref = BlockInfo { timestamp: 15, ..Default::default() };
let blob_hashes = vec![IndexedBlobHash {
hash: b256!("01b0761f87b081d5cf10757ccc89f12be355c70e2e29df288b65b30710dcbcd1"),
Expand Down
Loading

0 comments on commit a606d08

Please sign in to comment.