diff --git a/crates/tree_availability/src/lib.rs b/crates/tree_availability/src/lib.rs index 7bcc49d6..41f4c82d 100644 --- a/crates/tree_availability/src/lib.rs +++ b/crates/tree_availability/src/lib.rs @@ -51,6 +51,8 @@ impl TreeAvailabilityService { let mut handles = vec![]; // Initialize a new router and spawn the server + tracing::info!("Iniitalizing axum server on port {port}"); + let router = axum::Router::new() .route("/inclusionProof", axum::routing::post(inclusion_proof)) .route("/synced", axum::routing::post(synced)) @@ -60,10 +62,12 @@ impl TreeAvailabilityService { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let server_handle = tokio::spawn(async move { + tracing::info!("Spawning server"); axum::Server::bind(&address) .serve(router.into_make_service()) .await .map_err(TreeAvailabilityError::HyperError)?; + tracing::info!("Server spawned"); Ok(()) }); diff --git a/crates/tree_availability/src/server.rs b/crates/tree_availability/src/server.rs deleted file mode 100644 index 4a48a6a3..00000000 --- a/crates/tree_availability/src/server.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::sync::atomic::Ordering; -use std::sync::Arc; - -use axum::extract::State; -use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::Json; -use ethers::providers::Middleware; -use semaphore::poseidon_tree::{Branch, Proof}; -use semaphore::Field; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serialize}; -use serde_json::Value; - -use crate::error::TreeError; -use crate::world_tree::{Hash, WorldTree}; - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase", deny_unknown_fields)] -pub struct InclusionProofRequest { - pub identity_commitment: Hash, - pub root: Option, -} - -impl InclusionProofRequest { - pub fn new( - identity_commitment: Hash, - root: Option, - ) -> InclusionProofRequest { - Self { - identity_commitment, - root, - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InclusionProof { - pub root: Field, - //TODO: Implement `Deserialize` for Proof within semaphore-rs instead of using `deserialize_with` - #[serde(deserialize_with = "deserialize_proof")] - pub proof: Proof, - pub message: Option, -} - -impl InclusionProof { - pub fn new( - root: Field, - proof: Proof, - message: Option, - ) -> InclusionProof { - Self { - root, - proof, - message, - } - } -} - -fn deserialize_proof<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let value: Value = Deserialize::deserialize(deserializer)?; - if let Value::Array(array) = value { - let mut branches = vec![]; - for value in array { - let branch = serde_json::from_value::(value) - .map_err(serde::de::Error::custom)?; - branches.push(branch); - } - - Ok(semaphore::merkle_tree::Proof(branches)) - } else { - Err(D::Error::custom("Expected an array")) - } -} - -pub async fn inclusion_proof( - State(world_tree): State>>, - Json(req): Json, -) -> Result<(StatusCode, Json>), TreeError> { - if world_tree.tree_updater.synced.load(Ordering::Relaxed) { - let inclusion_proof = world_tree - .tree_data - .get_inclusion_proof(req.identity_commitment, req.root) - .await; - - Ok((StatusCode::OK, inclusion_proof.into())) - } else { - Err(TreeError::TreeNotSynced) - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SyncResponse { - pub synced: bool, - pub block_number: Option, -} - -impl SyncResponse { - pub fn new(synced: bool, block_number: Option) -> SyncResponse { - Self { - synced, - block_number, - } - } -} - -pub async fn synced( - State(world_tree): State>>, -) -> (StatusCode, Json) { - if world_tree.tree_updater.synced.load(Ordering::Relaxed) { - (StatusCode::OK, SyncResponse::new(true, None).into()) - } else { - let latest_synced_block = Some( - world_tree - .tree_updater - .latest_synced_block - .load(Ordering::SeqCst), - ); - ( - StatusCode::OK, - SyncResponse::new(false, latest_synced_block).into(), - ) - } -} - -impl TreeError { - fn to_status_code(&self) -> StatusCode { - //TODO: update this - StatusCode::BAD_REQUEST - } -} -impl IntoResponse for TreeError { - fn into_response(self) -> axum::response::Response { - let status_code = self.to_status_code(); - let response_body = self.to_string(); - (status_code, response_body).into_response() - } -} diff --git a/crates/tree_availability/src/server/middleware/logging_layer.rs b/crates/tree_availability/src/server/middleware/logging_layer.rs index 69bce182..388cadb7 100644 --- a/crates/tree_availability/src/server/middleware/logging_layer.rs +++ b/crates/tree_availability/src/server/middleware/logging_layer.rs @@ -1,223 +1,223 @@ -#![allow(clippy::cast_possible_truncation)] - -use axum::http::{Request, StatusCode}; -use axum::middleware::Next; -use axum::response::Response; -use bytes::Bytes; -use hyper::body::HttpBody; -use hyper::{Body, Method}; -use tracing::{error, info, info_span, warn, Instrument}; - -// 1 MiB -const MAX_REQUEST_BODY_SIZE: u64 = 1024 * 1024; - -pub async fn middleware( - request: Request, - next: Next, -) -> Result -where - B: HttpBody, - ::Error: std::error::Error, -{ - let (parts, body) = request.into_parts(); - - let uri_path = parts.uri.path().to_string(); - let request_method = parts.method.clone(); - let request_query = parts.uri.query().map(ToString::to_string); - - if let Method::GET = request_method { - let span = - info_span!("request", ?uri_path, ?request_method, ?request_query); - - async { - cli_batteries::trace_from_headers(&parts.headers); - - info!( - uri_path, - ?request_method, - ?request_query, - "Processing request" - ); - - let body = Body::empty(); - let request = Request::from_parts(parts, body); - - let response = next.run(request).await; - - let mut response = handle_response( - &uri_path, - &request_method, - request_query.as_deref(), - response, - ) - .await?; - - cli_batteries::trace_to_headers(response.headers_mut()); - - Ok(response) - } - .instrument(span) - .await - } else { - let body = body_to_string(body).await?; - - let span = info_span!( - "request", - ?uri_path, - ?request_method, - ?request_query, - ?body - ); - - async { - cli_batteries::trace_from_headers(&parts.headers); - - info!( - ?uri_path, - ?request_method, - ?request_query, - ?body, - "Processing request" - ); - - let body = Body::from(body); - let request = Request::from_parts(parts, body); - - let response = next.run(request).await; - - let mut response = handle_response( - &uri_path, - &request_method, - request_query.as_deref(), - response, - ) - .await?; - - cli_batteries::trace_to_headers(response.headers_mut()); - - Ok(response) - } - .instrument(span) - .await - } -} - -async fn handle_response( - uri_path: &str, - request_method: &Method, - request_query: Option<&str>, - response: Response, -) -> Result { - let (parts, body) = response.into_parts(); - - let response_status = parts.status; - - let response = if response_status.is_client_error() - || response_status.is_server_error() - { - let response_body = body_to_string(body).await?; - - if response_status.is_client_error() { - warn!( - uri_path, - ?request_method, - ?request_query, - ?response_status, - ?response_body, - "Error processing request" - ); - } else { - error!( - uri_path, - ?request_method, - ?request_query, - ?response_status, - ?response_body, - "Error processing request" - ); - } - - let body = axum::body::boxed(Body::from(response_body)); - - Response::from_parts(parts, body) - } else { - Response::from_parts(parts, body) - }; - - info!( - uri_path, - ?request_method, - ?request_query, - ?response_status, - "Finished processing request" - ); - - Ok(response) -} - -/// Reads the body of a request into a `Bytes` object chunk by chunk -/// and returns an error if the body is larger than `MAX_REQUEST_BODY_SIZE`. -async fn body_to_bytes_safe(body: B) -> Result -where - B: HttpBody, - ::Error: std::error::Error, -{ - use bytes::BufMut; - - let size_hint = body - .size_hint() - .upper() - .unwrap_or_else(|| body.size_hint().lower()); - - if size_hint > MAX_REQUEST_BODY_SIZE { - error!( - "Request body too large: {} bytes (max: {} bytes)", - size_hint, MAX_REQUEST_BODY_SIZE - ); - - return Err(StatusCode::PAYLOAD_TOO_LARGE); - } - - let mut body_bytes = Vec::with_capacity(size_hint as usize); - - futures_util::pin_mut!(body); - - while let Some(chunk) = body.data().await { - let chunk = chunk.map_err(|error| { - error!("Error reading body: {}", error); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - body_bytes.put(chunk); - - if body_bytes.len() > MAX_REQUEST_BODY_SIZE as usize { - error!( - "Request body too large: {} bytes (max: {} bytes)", - body_bytes.len(), - MAX_REQUEST_BODY_SIZE - ); - - return Err(StatusCode::PAYLOAD_TOO_LARGE); - } - } - - Ok(body_bytes.into()) -} - -async fn body_to_string(body: B) -> Result -where - B: HttpBody, - ::Error: std::error::Error, -{ - let body_bytes = body_to_bytes_safe(body).await?; - - let s = match String::from_utf8(body_bytes.to_vec()) { - Ok(s) => s, - Err(error) => { - error!("Error converting body to string: {}", error); - return Err(StatusCode::BAD_REQUEST); - } - }; - - Ok(s) -} +// #![allow(clippy::cast_possible_truncation)] + +// use axum::http::{Request, StatusCode}; +// use axum::middleware::Next; +// use axum::response::Response; +// use bytes::Bytes; +// use hyper::body::HttpBody; +// use hyper::{Body, Method}; +// use tracing::{error, info, info_span, warn, Instrument}; + +// // 1 MiB +// const MAX_REQUEST_BODY_SIZE: u64 = 1024 * 1024; + +// pub async fn middleware( +// request: Request, +// next: Next, +// ) -> Result +// where +// B: HttpBody, +// ::Error: std::error::Error, +// { +// let (parts, body) = request.into_parts(); + +// let uri_path = parts.uri.path().to_string(); +// let request_method = parts.method.clone(); +// let request_query = parts.uri.query().map(ToString::to_string); + +// if let Method::GET = request_method { +// let span = +// info_span!("request", ?uri_path, ?request_method, ?request_query); + +// async { +// cli_batteries::trace_from_headers(&parts.headers); + +// info!( +// uri_path, +// ?request_method, +// ?request_query, +// "Processing request" +// ); + +// let body = Body::empty(); +// let request = Request::from_parts(parts, body); + +// let response = next.run(request).await; + +// let mut response = handle_response( +// &uri_path, +// &request_method, +// request_query.as_deref(), +// response, +// ) +// .await?; + +// cli_batteries::trace_to_headers(response.headers_mut()); + +// Ok(response) +// } +// .instrument(span) +// .await +// } else { +// let body = body_to_string(body).await?; + +// let span = info_span!( +// "request", +// ?uri_path, +// ?request_method, +// ?request_query, +// ?body +// ); + +// async { +// cli_batteries::trace_from_headers(&parts.headers); + +// info!( +// ?uri_path, +// ?request_method, +// ?request_query, +// ?body, +// "Processing request" +// ); + +// let body = Body::from(body); +// let request = Request::from_parts(parts, body); + +// let response = next.run(request).await; + +// let mut response = handle_response( +// &uri_path, +// &request_method, +// request_query.as_deref(), +// response, +// ) +// .await?; + +// cli_batteries::trace_to_headers(response.headers_mut()); + +// Ok(response) +// } +// .instrument(span) +// .await +// } +// } + +// async fn handle_response( +// uri_path: &str, +// request_method: &Method, +// request_query: Option<&str>, +// response: Response, +// ) -> Result { +// let (parts, body) = response.into_parts(); + +// let response_status = parts.status; + +// let response = if response_status.is_client_error() +// || response_status.is_server_error() +// { +// let response_body = body_to_string(body).await?; + +// if response_status.is_client_error() { +// tracing::warn!( +// uri_path, +// ?request_method, +// ?request_query, +// ?response_status, +// ?response_body, +// "Error processing request" +// ); +// } else { +// tracing::error!( +// uri_path, +// ?request_method, +// ?request_query, +// ?response_status, +// ?response_body, +// "Error processing request" +// ); +// } + +// let body = axum::body::boxed(Body::from(response_body)); + +// Response::from_parts(parts, body) +// } else { +// Response::from_parts(parts, body) +// }; + +// info!( +// uri_path, +// ?request_method, +// ?request_query, +// ?response_status, +// "Finished processing request" +// ); + +// Ok(response) +// } + +// /// Reads the body of a request into a `Bytes` object chunk by chunk +// /// and returns an error if the body is larger than `MAX_REQUEST_BODY_SIZE`. +// async fn body_to_bytes_safe(body: B) -> Result +// where +// B: HttpBody, +// ::Error: std::error::Error, +// { +// use bytes::BufMut; + +// let size_hint = body +// .size_hint() +// .upper() +// .unwrap_or_else(|| body.size_hint().lower()); + +// if size_hint > MAX_REQUEST_BODY_SIZE { +// error!( +// "Request body too large: {} bytes (max: {} bytes)", +// size_hint, MAX_REQUEST_BODY_SIZE +// ); + +// return Err(StatusCode::PAYLOAD_TOO_LARGE); +// } + +// let mut body_bytes = Vec::with_capacity(size_hint as usize); + +// futures_util::pin_mut!(body); + +// while let Some(chunk) = body.data().await { +// let chunk = chunk.map_err(|error| { +// error!("Error reading body: {}", error); +// StatusCode::INTERNAL_SERVER_ERROR +// })?; + +// body_bytes.put(chunk); + +// if body_bytes.len() > MAX_REQUEST_BODY_SIZE as usize { +// error!( +// "Request body too large: {} bytes (max: {} bytes)", +// body_bytes.len(), +// MAX_REQUEST_BODY_SIZE +// ); + +// return Err(StatusCode::PAYLOAD_TOO_LARGE); +// } +// } + +// Ok(body_bytes.into()) +// } + +// async fn body_to_string(body: B) -> Result +// where +// B: HttpBody, +// ::Error: std::error::Error, +// { +// let body_bytes = body_to_bytes_safe(body).await?; + +// let s = match String::from_utf8(body_bytes.to_vec()) { +// Ok(s) => s, +// Err(error) => { +// error!("Error converting body to string: {}", error); +// return Err(StatusCode::BAD_REQUEST); +// } +// }; + +// Ok(s) +// } diff --git a/crates/tree_availability/src/server/middleware/remove_auth_layer.rs b/crates/tree_availability/src/server/middleware/remove_auth_layer.rs index 8eb03c3a..a812c42c 100644 --- a/crates/tree_availability/src/server/middleware/remove_auth_layer.rs +++ b/crates/tree_availability/src/server/middleware/remove_auth_layer.rs @@ -3,7 +3,10 @@ use axum::http::{Request, StatusCode}; use axum::middleware::Next; use axum::response::Response; -pub async fn middleware(mut request: Request, next: Next) -> Result { +pub async fn middleware( + mut request: Request, + next: Next, +) -> Result { request.headers_mut().remove(AUTHORIZATION); let response = next.run(request).await; diff --git a/crates/tree_availability/src/world_tree/block_scanner.rs b/crates/tree_availability/src/world_tree/block_scanner.rs index 47ba2172..98f414d5 100644 --- a/crates/tree_availability/src/world_tree/block_scanner.rs +++ b/crates/tree_availability/src/world_tree/block_scanner.rs @@ -7,7 +7,7 @@ use ethers::types::{ pub struct BlockScanner { middleware: M, - current_block: AtomicU64, + pub current_block: AtomicU64, window_size: u64, } @@ -43,6 +43,8 @@ where let from_block = current_block; let to_block = latest_block.min(from_block + self.window_size); + tracing::info!("Scanning from {} to {}", current_block, latest_block); + let next_current_block = to_block + 1; let from_block = Some(BlockNumber::Number(from_block.into())); @@ -63,6 +65,8 @@ where self.current_block .store(next_current_block, Ordering::SeqCst); + tracing::info!("Current block updated to {next_current_block}"); + Ok(logs) } } diff --git a/crates/tree_availability/src/world_tree/mod.rs b/crates/tree_availability/src/world_tree/mod.rs index ead736f3..c82a2a05 100644 --- a/crates/tree_availability/src/world_tree/mod.rs +++ b/crates/tree_availability/src/world_tree/mod.rs @@ -53,6 +53,8 @@ impl WorldTree { ) -> JoinHandle>> { let tree_data = self.tree_data.clone(); let tree_updater = self.tree_updater.clone(); + + tracing::info!("Spawning thread to sync tree"); tokio::spawn(async move { tree_updater.sync_to_head(&tree_data).await?; tree_updater.synced.store(true, Ordering::Relaxed); diff --git a/crates/tree_availability/src/world_tree/tree_data.rs b/crates/tree_availability/src/world_tree/tree_data.rs index f4542510..70a31827 100644 --- a/crates/tree_availability/src/world_tree/tree_data.rs +++ b/crates/tree_availability/src/world_tree/tree_data.rs @@ -57,10 +57,23 @@ impl TreeData { let mut tree_history = self.tree_history.write().await; if tree_history.len() == self.tree_history_size { - tree_history.pop_back(); + let historical_tree = tree_history + .pop_back() + .expect("Tree history length should be > 0"); + + tracing::info!( + "Popping tree from history with root {}", + historical_tree.root() + ); } - tree_history.push_front(self.tree.read().await.clone()); + let updated_tree = self.tree.read().await.clone(); + tracing::info!( + "Pushing tree to history with root {}", + updated_tree.root() + ); + + tree_history.push_front(updated_tree); } } @@ -75,18 +88,13 @@ impl TreeData { ) -> Option { let tree = self.tree.read().await; - // If the root is not specified, use the latest root - if root.is_none() { - Some(InclusionProof::new( - tree.root(), - Self::proof(&tree, identity)?, - None, - )) - } else { - let root = root.unwrap(); - + if let Some(root) = root { // If the root is the latest root, use the current version of the tree if root == tree.root() { + tracing::info!( + "Getting inclusion proof for {identity} at latest root" + ); + return Some(InclusionProof::new( root, Self::proof(&tree, identity)?, @@ -94,9 +102,13 @@ impl TreeData { )); } else { let tree_history = self.tree_history.read().await; - // // Otherwise, search the tree history for the root and use the corresponding tree + // Otherwise, search the tree history for the root and use the corresponding tree for prev_tree in tree_history.iter() { if prev_tree.root() == root { + tracing::info!( + "Getting inclusion proof for {identity} at root {root}" + ); + return Some(InclusionProof::new( root, Self::proof(prev_tree, identity)?, @@ -106,7 +118,20 @@ impl TreeData { } } + //TODO: should return an error if the tree root is specified but not in history + tracing::info!("Could not get inclusion proof for {identity} at {root}. Root not in tree history."); None + } else { + tracing::info!( + "Getting inclusion proof for {identity} at latest root" + ); + + // If the root is not specified, return a proof at the latest root + Some(InclusionProof::new( + tree.root(), + Self::proof(&tree, identity)?, + None, + )) } } diff --git a/crates/tree_availability/src/world_tree/tree_updater.rs b/crates/tree_availability/src/world_tree/tree_updater.rs index 3b3cfdad..2be8c1df 100644 --- a/crates/tree_availability/src/world_tree/tree_updater.rs +++ b/crates/tree_availability/src/world_tree/tree_updater.rs @@ -47,6 +47,8 @@ impl TreeUpdater { &self, tree_data: &TreeData, ) -> Result<(), TreeAvailabilityError> { + tracing::info!("Syncing tree to chain head"); + let logs = self .block_scanner .next( @@ -62,19 +64,22 @@ impl TreeUpdater { .map_err(TreeAvailabilityError::MiddlewareError)?; if logs.is_empty() { + tracing::info!("No `TreeChanged` events found within block range"); return Ok(()); } let mut futures = FuturesOrdered::new(); - //TODO: update this to use a throttle that can be set by the user + //TODO: update this to use a throttle that can be set by the user, however this is likely only going to result in one log per query for logs in logs.chunks(20) { for log in logs { - futures.push_back(self.middleware.get_transaction( - log.transaction_hash.ok_or( - TreeAvailabilityError::TransactionHashNotFound, - )?, - )); + let tx_hash = log + .transaction_hash + .ok_or(TreeAvailabilityError::TransactionHashNotFound)?; + + tracing::info!("Getting transaction for {tx_hash}"); + + futures.push_back(self.middleware.get_transaction(tx_hash)); } while let Some(transaction) = futures.next().await { @@ -97,12 +102,16 @@ impl TreeUpdater { tree_data: &TreeData, transaction: &Transaction, ) -> Result<(), TreeAvailabilityError> { + tracing::info!("Syncing from transaction {}", transaction.hash); + let calldata = &transaction.input; let function_selector = Selector::try_from(&calldata[0..4]) .expect("Transaction data does not contain a function selector"); if function_selector == RegisterIdentitiesCall::selector() { + tracing::info!("Decoding registerIdentities calldata"); + let register_identities_call = RegisterIdentitiesCall::decode(calldata.as_ref())?; @@ -113,10 +122,14 @@ impl TreeUpdater { .map(|u256: U256| Hash::from_limbs(u256.0)) .collect(); + tracing::info!(?start_index, ?identities); + tree_data .insert_many_at(start_index as usize, &identities) .await; } else if function_selector == DeleteIdentitiesCall::selector() { + tracing::info!("Decoding deleteIdentities calldata"); + let delete_identities_call = DeleteIdentitiesCall::decode(calldata.as_ref())?;