Skip to content

Commit

Permalink
feat(examples): Trusted Sync Metrics (#308)
Browse files Browse the repository at this point in the history
* feat: trusted sync metrics

* feat: online derive provider metrics

* fix: chain provider metrics

* feat(derive): blob provider metrics

* fix: make lazy-static optional
  • Loading branch information
refcell authored Jun 25, 2024
1 parent 9788c80 commit 8a83559
Show file tree
Hide file tree
Showing 12 changed files with 891 additions and 87 deletions.
505 changes: 496 additions & 9 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ alloc-no-stdlib = "2.0.4"
serde = { version = "1.0.203", default-features = false, features = ["derive"], optional = true }

# `online` feature dependencies
lazy_static = { version = "1.5.0", optional = true }
prometheus = { version = "0.13.4", features = ["process"], optional = true }
c-kzg = { version = "1.0.2", default-features = false, optional = true }
sha2 = { version = "0.10.8", default-features = false, optional = true }
alloy-transport = { version = "0.1", default-features = false, optional = true }
Expand Down Expand Up @@ -81,6 +83,8 @@ online = [
"dep:alloy-transport",
"dep:alloy-transport-http",
"dep:reqwest",
"dep:prometheus",
"dep:lazy_static",
"alloy-provider/reqwest",
"alloy-rpc-client/reqwest",
"alloy-transport-http/reqwest",
Expand Down
155 changes: 140 additions & 15 deletions crates/derive/src/online/alloy_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,61 @@ impl AlloyChainProvider {
#[async_trait]
impl ChainProvider for AlloyChainProvider {
async fn header_by_hash(&mut self, hash: B256) -> Result<Header> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["chain_provider", "header_by_hash"])
.start_timer();
if let Some(header) = self.header_by_hash_cache.get(&hash) {
timer.observe_duration();
return Ok(header.clone());
}

let raw_header: TransportResult<Bytes> =
self.inner.raw_request("debug_getRawHeader".into(), [hash]).await;
let raw_header: Bytes = raw_header.map_err(|e| anyhow!(e))?;
Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e))
let raw_header: Bytes = match raw_header.map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
match Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e)) {
Ok(header) => {
self.header_by_hash_cache.put(hash, header.clone());
timer.observe_duration();
Ok(header)
}
Err(e) => {
timer.observe_duration();
Err(e)
}
}
}

async fn block_info_by_number(&mut self, number: u64) -> Result<BlockInfo> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["chain_provider", "block_info_by_number"])
.start_timer();
if let Some(block_info) = self.block_info_by_number_cache.get(&number) {
timer.observe_duration();
return Ok(*block_info);
}

let raw_header: TransportResult<Bytes> =
self.inner.raw_request("debug_getRawHeader".into(), [U64::from(number)]).await;
let raw_header: Bytes = raw_header.map_err(|e| anyhow!(e))?;
let header = Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e))?;
let raw_header: Bytes = match raw_header.map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let header = match Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e)) {
Ok(h) => h,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};

let block_info = BlockInfo {
hash: header.hash_slow(),
Expand All @@ -92,19 +128,30 @@ impl ChainProvider for AlloyChainProvider {
timestamp: header.timestamp,
};
self.block_info_by_number_cache.put(number, block_info);
timer.observe_duration();
Ok(block_info)
}

async fn receipts_by_hash(&mut self, hash: B256) -> Result<Vec<Receipt>> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["chain_provider", "receipts_by_hash"])
.start_timer();
if let Some(receipts) = self.receipts_by_hash_cache.get(&hash) {
timer.observe_duration();
return Ok(receipts.clone());
}

let raw_receipts: TransportResult<Vec<Bytes>> =
self.inner.raw_request("debug_getRawReceipts".into(), [hash]).await;
let raw_receipts: Vec<Bytes> = raw_receipts.map_err(|e| anyhow!(e))?;
let raw_receipts: Vec<Bytes> = match raw_receipts.map_err(|e| anyhow!(e)) {
Ok(r) => r,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};

let receipts = raw_receipts
let receipts = match raw_receipts
.iter()
.map(|r| {
let r = &mut r.as_ref();
Expand All @@ -116,24 +163,48 @@ impl ChainProvider for AlloyChainProvider {

Ok(ReceiptWithBloom::decode(r).map_err(|e| anyhow!(e))?.receipt)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()
{
Ok(r) => r,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
self.receipts_by_hash_cache.put(hash, receipts.clone());
timer.observe_duration();
Ok(receipts)
}

async fn block_info_and_transactions_by_hash(
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["chain_provider", "block_info_and_transactions_by_hash"])
.start_timer();
if let Some(block_info_and_txs) = self.block_info_and_transactions_by_hash_cache.get(&hash)
{
timer.observe_duration();
return Ok(block_info_and_txs.clone());
}

let raw_block: TransportResult<Bytes> =
self.inner.raw_request("debug_getRawBlock".into(), [hash]).await;
let raw_block: Bytes = raw_block.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;
let raw_block: Bytes = match raw_block.map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let block = match Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};

let block_info = BlockInfo {
hash: block.header.hash_slow(),
Expand All @@ -142,6 +213,7 @@ impl ChainProvider for AlloyChainProvider {
timestamp: block.header.timestamp,
};
self.block_info_and_transactions_by_hash_cache.put(hash, (block_info, block.body.clone()));
timer.observe_duration();
Ok((block_info, block.body))
}
}
Expand Down Expand Up @@ -199,28 +271,62 @@ impl AlloyL2ChainProvider {
#[async_trait]
impl L2ChainProvider for AlloyL2ChainProvider {
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["l2_chain_provider", "l2_block_info_by_number"])
.start_timer();
if let Some(l2_block_info) = self.l2_block_info_by_number_cache.get(&number) {
timer.observe_duration();
return Ok(*l2_block_info);
}

let payload = self.payload_by_number(number).await?;
let l2_block_info = payload.to_l2_block_ref(self.rollup_config.as_ref())?;
let payload = match self.payload_by_number(number).await {
Ok(p) => p,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let l2_block_info = match payload.to_l2_block_ref(self.rollup_config.as_ref()) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
self.l2_block_info_by_number_cache.put(number, l2_block_info);
timer.observe_duration();
Ok(l2_block_info)
}

async fn payload_by_number(&mut self, number: u64) -> Result<L2ExecutionPayloadEnvelope> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["l2_chain_provider", "payload_by_number"])
.start_timer();
if let Some(payload) = self.payload_by_number_cache.get(&number) {
timer.observe_duration();
return Ok(payload.clone());
}

let raw_block: TransportResult<Bytes> =
self.inner.raw_request("debug_getRawBlock".into(), [U64::from(number)]).await;
let raw_block: Bytes = raw_block.map_err(|e| anyhow!(e))?;
let block = OpBlock::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;
let raw_block: Bytes = match raw_block.map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let block = match OpBlock::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e)) {
Ok(b) => b,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let payload_envelope: L2ExecutionPayloadEnvelope = block.into();

self.payload_by_number_cache.put(number, payload_envelope.clone());
timer.observe_duration();
Ok(payload_envelope)
}

Expand All @@ -229,11 +335,30 @@ impl L2ChainProvider for AlloyL2ChainProvider {
number: u64,
rollup_config: Arc<RollupConfig>,
) -> Result<SystemConfig> {
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["l2_chain_provider", "system_config_by_number"])
.start_timer();
if let Some(system_config) = self.system_config_by_number_cache.get(&number) {
timer.observe_duration();
return Ok(system_config.clone());
}

let envelope = self.payload_by_number(number).await?;
envelope.to_system_config(&rollup_config)
let envelope = match self.payload_by_number(number).await {
Ok(e) => e,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let sys_config = match envelope.to_system_config(&rollup_config) {
Ok(s) => s,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
self.system_config_by_number_cache.put(number, sys_config.clone());
timer.observe_duration();
Ok(sys_config)
}
}
64 changes: 51 additions & 13 deletions crates/derive/src/online/beacon_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,41 +56,78 @@ impl OnlineBeaconClient {
#[async_trait]
impl BeaconClient for OnlineBeaconClient {
async fn config_spec(&self) -> Result<APIConfigResponse> {
self.inner
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["beacon_client", "config_spec"])
.start_timer();
let first = match self
.inner
.get(format!("{}/{}", self.base, SPEC_METHOD))
.send()
.await
.map_err(|e| anyhow!(e))?
.json::<APIConfigResponse>()
.await
.map_err(|e| anyhow!(e))
{
Ok(response) => response,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let res = first.json::<APIConfigResponse>().await.map_err(|e| anyhow!(e));
timer.observe_duration();
res
}

async fn beacon_genesis(&self) -> Result<APIGenesisResponse> {
self.inner
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["beacon_client", "beacon_genesis"])
.start_timer();
let first = match self
.inner
.get(format!("{}/{}", self.base, GENESIS_METHOD))
.send()
.await
.map_err(|e| anyhow!(e))?
.json::<APIGenesisResponse>()
.await
.map_err(|e| anyhow!(e))
{
Ok(response) => response,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let res = first.json::<APIGenesisResponse>().await.map_err(|e| anyhow!(e));
timer.observe_duration();
res
}

async fn beacon_blob_side_cars(
&self,
slot: u64,
hashes: &[IndexedBlobHash],
) -> Result<Vec<APIBlobSidecar>> {
let raw_response = self
let timer = crate::online::metrics::PROVIDER_RESPONSE_TIME
.with_label_values(&["beacon_client", "beacon_blob_side_cars"])
.start_timer();
let raw_response = match self
.inner
.get(format!("{}/{}/{}", self.base, SIDECARS_METHOD_PREFIX, slot))
.send()
.await
.map_err(|e| anyhow!(e))?
.json::<APIGetBlobSidecarsResponse>()
.await
.map_err(|e| anyhow!(e))?;
.map_err(|e| anyhow!(e))
{
Ok(response) => response,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};
let raw_response =
match raw_response.json::<APIGetBlobSidecarsResponse>().await.map_err(|e| anyhow!(e)) {
Ok(response) => response,
Err(e) => {
timer.observe_duration();
return Err(e);
}
};

let mut sidecars = Vec::with_capacity(hashes.len());

Expand All @@ -103,6 +140,7 @@ impl BeaconClient for OnlineBeaconClient {
}
});

timer.observe_duration();
Ok(sidecars)
}
}
Loading

0 comments on commit 8a83559

Please sign in to comment.