diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index cd3bca56..848fb32e 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -1,16 +1,4 @@ -use actix_web::{ - web::{self}, - App, HttpResponse, HttpServer, -}; use mimalloc::MiMalloc; -use near_jsonrpc::{ - primitives::{ - errors::{RpcError, RpcErrorKind, RpcRequestValidationErrorKind}, - message::{Message, Request}, - }, - RpcRequest, -}; -use serde_json::Value; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -22,7 +10,6 @@ mod cache; mod config; mod health; mod metrics; -mod middlewares; mod modules; mod utils; @@ -32,8 +19,12 @@ pub(crate) const RPC_SERVER: &str = "read_rpc_server"; /// Serialises response of a query into JSON to be sent to the client. /// /// Returns an internal server error if the value fails to serialise. -fn serialize_response(value: impl serde::ser::Serialize) -> Result { - serde_json::to_value(value).map_err(|err| RpcError::serialization_error(err.to_string())) +fn serialize_response( + value: impl serde::ser::Serialize, +) -> Result { + serde_json::to_value(value).map_err(|err| { + near_jsonrpc::primitives::errors::RpcError::serialization_error(err.to_string()) + }) } /// Processes a specific method call. @@ -43,29 +34,31 @@ fn serialize_response(value: impl serde::ser::Serialize) -> Result( - request: Request, + request: near_jsonrpc::primitives::message::Request, callback: impl FnOnce(R) -> F, -) -> Result +) -> Result where - R: RpcRequest, + R: near_jsonrpc::RpcRequest, V: serde::ser::Serialize, - RpcError: std::convert::From, + near_jsonrpc::primitives::errors::RpcError: From, F: std::future::Future>, { serialize_response(callback(R::parse(request.params)?).await?) } async fn rpc_handler( - data: web::Data, - payload: web::Json, -) -> HttpResponse { - let Message::Request(request) = payload.0 else { - return HttpResponse::BadRequest().finish(); + data: actix_web::web::Data, + payload: actix_web::web::Json, +) -> actix_web::HttpResponse { + let near_jsonrpc::primitives::message::Message::Request(request) = payload.0 else { + return actix_web::HttpResponse::BadRequest().finish(); }; let id = request.id.clone(); let method_name = request.method.clone(); + let mut method_not_found = false; + let result = match method_name.as_ref() { // custom request methods "view_state_paginated" => { @@ -74,7 +67,7 @@ async fn rpc_handler( modules::state::methods::view_state_paginated(data, request_data).await, ) } else { - Err(RpcError::parse_error( + Err(near_jsonrpc::primitives::errors::RpcError::parse_error( "Failed to parse request data".to_string(), )) } @@ -231,31 +224,36 @@ async fn rpc_handler( }) .await } - _ => Err(RpcError::method_not_found(method_name.clone())), + _ => { + method_not_found = true; + Err(near_jsonrpc::primitives::errors::RpcError::method_not_found(method_name.clone())) + } }; - match &result { - Ok(_) => { - metrics::METHOD_CALLS_COUNTER - .with_label_values(&[method_name.as_ref()]) - .inc(); - } - Err(err) => match &err.error_struct { - Some(RpcErrorKind::RequestValidationError(validation_error)) => { - match validation_error { - RpcRequestValidationErrorKind::ParseError { .. } => { - metrics::METHOD_ERRORS_TOTAL - .with_label_values(&[method_name.as_ref(), "PARSE_ERROR"]) - .inc() - } - RpcRequestValidationErrorKind::MethodNotFound { .. } => { - metrics::METHOD_CALLS_COUNTER - .with_label_values(&["METHOD_NOT_FOUND"]) - .inc() - } - } - } - Some(RpcErrorKind::HandlerError(error_struct)) => { + // increase METHOD_CALLS_COUNTER for each method call + if method_not_found { + metrics::METHOD_CALLS_COUNTER + .with_label_values(&["METHOD_NOT_FOUND"]) + .inc(); + } else { + // For query method we calculate the number of total calls in the method + // and calculate the number of query by types in the inside query handler + metrics::METHOD_CALLS_COUNTER + .with_label_values(&[method_name.as_ref()]) + .inc(); + }; + + // calculate method error metrics + if let Err(err) = &result { + match &err.error_struct { + Some(near_jsonrpc::primitives::errors::RpcErrorKind::RequestValidationError( + near_jsonrpc::primitives::errors::RpcRequestValidationErrorKind::ParseError { + .. + }, + )) => metrics::METHOD_ERRORS_TOTAL + .with_label_values(&[method_name.as_ref(), "PARSE_ERROR"]) + .inc(), + Some(near_jsonrpc::primitives::errors::RpcErrorKind::HandlerError(error_struct)) => { if let Some(error_name) = error_struct.get("name").and_then(serde_json::Value::as_str) { @@ -264,42 +262,52 @@ async fn rpc_handler( .inc(); } } - Some(RpcErrorKind::InternalError(_)) => { + Some(near_jsonrpc::primitives::errors::RpcErrorKind::InternalError(_)) => { metrics::METHOD_ERRORS_TOTAL .with_label_values(&[method_name.as_ref(), "INTERNAL_ERROR"]) .inc(); } None => {} - }, + _ => {} + } } let mut response = if cfg!(not(feature = "detailed-status-codes")) { - HttpResponse::Ok() + actix_web::HttpResponse::Ok() } else { match &result { - Ok(_) => HttpResponse::Ok(), + Ok(_) => actix_web::HttpResponse::Ok(), Err(err) => match &err.error_struct { - Some(RpcErrorKind::RequestValidationError(_)) => HttpResponse::BadRequest(), - Some(RpcErrorKind::HandlerError(error_struct)) => { + Some(near_jsonrpc::primitives::errors::RpcErrorKind::RequestValidationError(_)) => { + actix_web::HttpResponse::BadRequest() + } + Some(near_jsonrpc::primitives::errors::RpcErrorKind::HandlerError( + error_struct, + )) => { if let Some(error_name) = error_struct.get("name").and_then(serde_json::Value::as_str) { if error_name == "TIMEOUT_ERROR" { - HttpResponse::RequestTimeout() + actix_web::HttpResponse::RequestTimeout() } else { - HttpResponse::Ok() + actix_web::HttpResponse::Ok() } } else { - HttpResponse::Ok() + actix_web::HttpResponse::Ok() } } - Some(RpcErrorKind::InternalError(_)) => HttpResponse::InternalServerError(), - None => HttpResponse::Ok(), + Some(near_jsonrpc::primitives::errors::RpcErrorKind::InternalError(_)) => { + actix_web::HttpResponse::InternalServerError() + } + None => actix_web::HttpResponse::Ok(), }, } }; - response.json(Message::response(id, result.map_err(RpcError::from))) + response.json(near_jsonrpc::primitives::message::Message::response( + id, + result.map_err(near_jsonrpc::primitives::errors::RpcError::from), + )) } #[actix_web::main] @@ -326,7 +334,7 @@ async fn main() -> anyhow::Result<()> { let server_port = rpc_server_config.general.server_port; - let server_context = web::Data::new( + let server_context = actix_web::web::Data::new( config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?, ); @@ -388,15 +396,14 @@ async fn main() -> anyhow::Result<()> { }); } - HttpServer::new(move || { + actix_web::HttpServer::new(move || { let cors = actix_cors::Cors::permissive(); - App::new() + actix_web::App::new() .wrap(cors) .wrap(tracing_actix_web::TracingLogger::default()) - .wrap(middlewares::RequestsCounters) .app_data(server_context.clone()) - .service(web::scope("/").route("", web::post().to(rpc_handler))) + .service(actix_web::web::scope("/").route("", actix_web::web::post().to(rpc_handler))) .service(metrics::get_metrics) .service(health::get_health_status) }) diff --git a/rpc-server/src/middlewares.rs b/rpc-server/src/middlewares.rs deleted file mode 100644 index d0f1593a..00000000 --- a/rpc-server/src/middlewares.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::metrics::METHOD_CALLS_COUNTER; -use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use futures::future::LocalBoxFuture; -use futures::StreamExt; -use near_jsonrpc::{primitives::message::Request, RpcRequest}; -use std::future::{ready, Ready}; - -// Middleware to count requests and methods calls -// This middleware is used to count the number of requests and the number of calls to each method -pub struct RequestsCounters; - -impl Transform for RequestsCounters -where - S: Service, Error = actix_web::Error> + 'static, - S::Future: 'static, - B: 'static, -{ - type Response = ServiceResponse; - type Error = actix_web::Error; - type Transform = RequestsCountersMiddleware; - type InitError = (); - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(RequestsCountersMiddleware { - service: std::sync::Arc::new(service), - })) - } -} - -pub struct RequestsCountersMiddleware { - service: std::sync::Arc, -} - -impl Service for RequestsCountersMiddleware -where - S: Service, Error = actix_web::Error> + 'static, - S::Future: 'static, - B: 'static, -{ - type Response = ServiceResponse; - type Error = actix_web::Error; - type Future = LocalBoxFuture<'static, Result>; - - forward_ready!(service); - - fn call(&self, request: ServiceRequest) -> Self::Future { - let service_clone = self.service.clone(); - Box::pin(async move { - if request.path() != "/" { - return service_clone.call(request).await; - } - - let (req, mut payload) = request.into_parts(); - let mut body = actix_web::web::BytesMut::new(); - while let Some(chunk) = &payload.next().await { - match chunk { - Ok(chunk) => body.extend_from_slice(chunk), - Err(e) => { - tracing::error!("Error receiving payload: {:?}", e); - } - }; - } - - if let Ok(obj) = serde_json::from_slice::(&body) { - if obj.method == "query" { - if let Ok(query_request) = - near_jsonrpc::primitives::types::query::RpcQueryRequest::parse(obj.params) - { - let method = match &query_request.request { - near_primitives::views::QueryRequest::ViewAccount { .. } => { - "query_view_account" - } - near_primitives::views::QueryRequest::ViewCode { .. } => { - "query_view_code" - } - near_primitives::views::QueryRequest::ViewState { .. } => { - "query_view_state" - } - near_primitives::views::QueryRequest::ViewAccessKey { .. } => { - "query_view_access_key" - } - near_primitives::views::QueryRequest::ViewAccessKeyList { .. } => { - "query_view_access_key_list" - } - near_primitives::views::QueryRequest::CallFunction { .. } => { - "query_call_function" - } - }; - METHOD_CALLS_COUNTER.with_label_values(&[method]).inc() - } - } - }; - - let mut request = ServiceRequest::from_request(req); - let (_, mut payload) = actix_http::h1::Payload::create(true); - payload.unread_data(body.into()); - request.set_payload(payload.into()); - service_clone.call(request).await - }) - } -} diff --git a/rpc-server/src/modules/queries/methods.rs b/rpc-server/src/modules/queries/methods.rs index 9e44d1f4..3d628b48 100644 --- a/rpc-server/src/modules/queries/methods.rs +++ b/rpc-server/src/modules/queries/methods.rs @@ -34,6 +34,10 @@ pub async fn query( "query_view_access_key_list" } }; + // increase query method calls counter + crate::metrics::METHOD_CALLS_COUNTER + .with_label_values(&[method_name]) + .inc(); if let near_primitives::types::BlockReference::Finality( near_primitives::types::Finality::None,