Skip to content

Commit

Permalink
feat(derive): Online ChainProvider (#93)
Browse files Browse the repository at this point in the history
* feat(derive): Online `ChainProvider` impl

lint

* feat(derive): Add start of `L2SafeBlockProvider`

* feat(derive): Add caching to providers

* chore(derive): Rename `alloy-providers` feature to `online`

* chore(derive): rebase
  • Loading branch information
clabby authored Apr 8, 2024
1 parent 56fa3ba commit 74a838a
Show file tree
Hide file tree
Showing 19 changed files with 1,985 additions and 150 deletions.
1,606 changes: 1,561 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }
lru = "0.12.3"

# Optional
# `serde` feature dependencies
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }

# `online` feature dependencies
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", optional = true}
alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", optional = true }
reqwest = { version = "0.12", default-features = false, optional = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["full"] }
proptest = "1.4.0"
Expand All @@ -37,3 +43,9 @@ tracing-subscriber = "0.3.18"
default = ["serde", "k256"]
serde = ["dep:serde", "alloy-primitives/serde"]
k256 = ["alloy-primitives/k256", "alloy-consensus/k256"]
online = [
"dep:alloy-provider",
"dep:alloy-transport-http",
"dep:reqwest",
"alloy-consensus/serde"
]
195 changes: 195 additions & 0 deletions crates/derive/src/alloy_providers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! This module contains concrete implementations of the data provider traits, using an alloy
//! provider on the backend.
use crate::{
traits::{ChainProvider, L2ChainProvider},
types::{Block, BlockInfo, ExecutionPayloadEnvelope, L2BlockInfo, RollupConfig},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_consensus::{Header, Receipt, ReceiptWithBloom, TxEnvelope, TxType};
use alloy_primitives::{Bytes, B256, U64};
use alloy_provider::Provider;
use alloy_rlp::{Buf, Decodable};
use alloy_transport_http::Http;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use core::num::NonZeroUsize;
use lru::LruCache;

const CACHE_SIZE: usize = 16;

/// The [AlloyChainProvider] is a concrete implementation of the [ChainProvider] trait, providing
/// data over Ethereum JSON-RPC using an alloy provider as the backend.
///
/// **Note**:
/// This provider fetches data using the `debug_getRawHeader`, `debug_getRawReceipts`, and
/// `debug_getRawBlock` methods. The RPC must support this namespace.
#[derive(Debug)]
pub struct AlloyChainProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// `block_info_by_number` LRU cache.
block_info_by_number_cache: LruCache<u64, BlockInfo>,
/// `block_info_by_number` LRU cache.
receipts_by_hash_cache: LruCache<B256, Vec<Receipt>>,
/// `block_info_and_transactions_by_hash` LRU cache.
block_info_and_transactions_by_hash_cache: LruCache<B256, (BlockInfo, Vec<TxEnvelope>)>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyChainProvider<T> {
/// Creates a new [AlloyChainProvider] with the given alloy provider.
pub fn new(inner: T) -> Self {
Self {
inner,
block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
receipts_by_hash_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
block_info_and_transactions_by_hash_cache: LruCache::new(
NonZeroUsize::new(CACHE_SIZE).unwrap(),
),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T> {
async fn block_info_by_number(&mut self, number: u64) -> Result<BlockInfo> {
if let Some(block_info) = self.block_info_by_number_cache.get(&number) {
return Ok(*block_info);
}

let raw_header: Bytes = self
.inner
.client()
.request("debug_getRawHeader", [U64::from(number)])
.await
.map_err(|e| anyhow!(e))?;
let header = Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e))?;

let block_info = BlockInfo {
hash: header.hash_slow(),
number,
parent_hash: header.parent_hash,
timestamp: header.timestamp,
};
self.block_info_by_number_cache.put(number, block_info);
Ok(block_info)
}

async fn receipts_by_hash(&mut self, hash: B256) -> Result<Vec<Receipt>> {
if let Some(receipts) = self.receipts_by_hash_cache.get(&hash) {
return Ok(receipts.clone());
}

let raw_receipts: Vec<Bytes> = self
.inner
.client()
.request("debug_getRawReceipts", [hash])
.await
.map_err(|e| anyhow!(e))?;

let receipts = raw_receipts
.iter()
.map(|r| {
let r = &mut r.as_ref();

// Skip the transaction type byte if it exists
if !r.is_empty() && r[0] <= TxType::Eip4844 as u8 {
r.advance(1);
}

Ok(ReceiptWithBloom::decode(r).map_err(|e| anyhow!(e))?.receipt)
})
.collect::<Result<Vec<_>>>()?;
self.receipts_by_hash_cache.put(hash, receipts.clone());
Ok(receipts)
}

async fn block_info_and_transactions_by_hash(
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)> {
if let Some(block_info_and_txs) = self.block_info_and_transactions_by_hash_cache.get(&hash)
{
return Ok(block_info_and_txs.clone());
}

let raw_block: Bytes = self
.inner
.client()
.request("debug_getRawBlock", [hash])
.await
.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;

let block_info = BlockInfo {
hash: block.header.hash_slow(),
number: block.header.number,
parent_hash: block.header.parent_hash,
timestamp: block.header.timestamp,
};
self.block_info_and_transactions_by_hash_cache.put(hash, (block_info, block.body.clone()));
Ok((block_info, block.body))
}
}

/// The [AlloyL2SafeHeadProvider] is a concrete implementation of the [L2ChainProvider] trait,
/// providing data over Ethereum JSON-RPC using an alloy provider as the backend.
///
/// **Note**:
/// This provider fetches data using the `debug_getRawBlock` method. The RPC must support this
/// namespace.
#[derive(Debug)]
pub struct AlloyL2SafeHeadProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// The rollup configuration.
rollup_config: Arc<RollupConfig>,
/// `payload_by_number` LRU cache.
payload_by_number_cache: LruCache<u64, ExecutionPayloadEnvelope>,
/// `l2_block_info_by_number` LRU cache.
l2_block_info_by_number_cache: LruCache<u64, L2BlockInfo>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyL2SafeHeadProvider<T> {
/// Creates a new [AlloyL2SafeHeadProvider] with the given alloy provider and [RollupConfig].
pub fn new(inner: T, rollup_config: Arc<RollupConfig>) -> Self {
Self {
inner,
rollup_config,
payload_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
l2_block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> L2ChainProvider for AlloyL2SafeHeadProvider<T> {
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
if let Some(l2_block_info) = self.l2_block_info_by_number_cache.get(&number) {
return Ok(*l2_block_info);
}

let payload = self.payload_by_number(number).await?;
let l2_block_info = payload.to_l2_block_ref(self.rollup_config.as_ref())?;
self.l2_block_info_by_number_cache.put(number, l2_block_info);
Ok(l2_block_info)
}

async fn payload_by_number(&mut self, number: u64) -> Result<ExecutionPayloadEnvelope> {
if let Some(payload) = self.payload_by_number_cache.get(&number) {
return Ok(payload.clone());
}

let raw_block: Bytes = self
.inner
.client()
.request("debug_getRawBlock", [U64::from(number)])
.await
.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;
let payload_envelope: ExecutionPayloadEnvelope = block.into();

self.payload_by_number_cache.put(number, payload_envelope.clone());
Ok(payload_envelope)
}
}
15 changes: 4 additions & 11 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod stages;
pub mod traits;
pub mod types;

#[cfg(feature = "online")]
pub mod alloy_providers;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;
Expand All @@ -33,16 +36,6 @@ impl DerivationPipeline {
where
P: ChainProvider + Clone + Debug + Send,
{
// let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone(),
// telemetry.clone()); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source,
// telemetry.clone()); let frame_queue = FrameQueue::new(l1_retrieval,
// telemetry.clone()); let channel_bank = ChannelBank::new(rollup_config.clone(),
// frame_queue, telemetry.clone()); let channel_reader =
// ChannelReader::new(channel_bank, telemetry.clone()); let batch_queue =
// BatchQueue::new(rollup_config.clone(), channel_reader, telemetry.clone(), fetcher);
// let attributes_queue = AttributesQueue::new(rollup_config.clone(), batch_queue,
// telemetry.clone(), builder);

unimplemented!()
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
12 changes: 6 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
stages::attributes_queue::AttributesProvider,
traits::{OriginProvider, ResettableStage, SafeBlockFetcher},
traits::{L2ChainProvider, OriginProvider, ResettableStage},
types::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig,
SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait BatchQueueProvider {
pub struct BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
/// The rollup config.
cfg: RollupConfig,
Expand Down Expand Up @@ -72,7 +72,7 @@ where
impl<P, BF> BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: RollupConfig, prev: P, fetcher: BF) -> Self {
Expand Down Expand Up @@ -237,7 +237,7 @@ where
impl<P, BF> AttributesProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
/// Returns the next valid batch upon the given safe head.
/// Also returns the boolean that indicates if the batch is the last block in the batch.
Expand Down Expand Up @@ -362,7 +362,7 @@ where
impl<P, BF> OriginProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
Expand All @@ -373,7 +373,7 @@ where
impl<P, BF> ResettableStage for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
Expand Down
12 changes: 3 additions & 9 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,9 @@ impl BatchReader {
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self { data: Some(data.to_vec()), decompressed: Vec::new(), cursor: 0 }
}
}

impl From<Vec<u8>> for BatchReader {
fn from(data: Vec<u8>) -> Self {
Self { data: Some(data), decompressed: Vec::new(), cursor: 0 }
impl<T: Into<Vec<u8>>> From<T> for BatchReader {
fn from(data: T) -> Self {
Self { data: Some(data.into()), decompressed: Vec::new(), cursor: 0 }
}
}

Expand Down
Loading

0 comments on commit 74a838a

Please sign in to comment.