diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 4c1c9bfc..341491a8 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -8,6 +8,10 @@ pub struct Opts { #[clap(long, env = "NEAR_RPC_URL")] pub rpc_url: http::Uri, + // near network archive rpc url + #[clap(long, env = "ARCHIVE_NEAR_RPC_URL")] + pub archive_rpc_url: Option, + // Indexer bucket name #[clap(long, env = "AWS_BUCKET_NAME")] pub s3_bucket_name: String, @@ -140,7 +144,7 @@ impl Opts { pub struct ServerContext { pub s3_client: near_lake_framework::s3_fetchers::LakeS3Client, pub db_manager: std::sync::Arc>, - pub near_rpc_client: near_jsonrpc_client::JsonRpcClient, + pub near_rpc_client: crate::utils::JsonRpcClient, pub s3_bucket_name: String, pub genesis_config: near_chain_configs::GenesisConfig, pub blocks_cache: @@ -158,7 +162,7 @@ impl ServerContext { pub fn new( s3_client: near_lake_framework::s3_fetchers::LakeS3Client, db_manager: impl database::ReaderDbManager + Sync + Send + 'static, - near_rpc_client: near_jsonrpc_client::JsonRpcClient, + near_rpc_client: crate::utils::JsonRpcClient, s3_bucket_name: String, genesis_config: near_chain_configs::GenesisConfig, blocks_cache: std::sync::Arc< diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 9762280b..9cf2f92e 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -74,9 +74,10 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "tracing-instrumentation"))] init_logging(false)?; - let near_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url.to_string()); + let near_rpc_client = + utils::JsonRpcClient::new(opts.rpc_url.clone(), opts.archive_rpc_url.clone()); // We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance - let near_rpc_client = near_rpc_client.header(("Referer", "read-rpc"))?; // TODO: make it configurable + let near_rpc_client = near_rpc_client.header("Referer", "read-rpc")?; // TODO: make it configurable let final_block = get_final_cache_block(&near_rpc_client) .await @@ -149,7 +150,7 @@ async fn main() -> anyhow::Result<()> { s3_config, )), db_manager, - near_rpc_client.clone(), + near_rpc_client, opts.s3_bucket_name.clone(), genesis_config, std::sync::Arc::clone(&blocks_cache), diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 6370c508..5351f8c8 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -6,8 +6,6 @@ use crate::modules::blocks::utils::{ #[cfg(feature = "shadow_data_consistency")] use crate::utils::shadow_compare_results; use jsonrpc_v2::{Data, Params}; - -use crate::utils::proxy_rpc_call; use near_primitives::trie_key::TrieKey; use near_primitives::views::StateChangeValueView; @@ -27,7 +25,7 @@ pub async fn block( // genesis sync_checkpoint or earliest_available sync_checkpoint // and proxy to near-rpc crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc(); - let block_view = proxy_rpc_call(&data.near_rpc_client, params).await?; + let block_view = data.near_rpc_client.call(params).await?; Ok(near_jsonrpc_primitives::types::blocks::RpcBlockResponse { block_view }) } near_primitives::types::BlockReference::Finality(finality) => { @@ -36,7 +34,7 @@ pub async fn block( // genesis sync_checkpoint or earliest_available sync_checkpoint // and proxy to near-rpc crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - let block_view = proxy_rpc_call(&data.near_rpc_client, params).await?; + let block_view = data.near_rpc_client.call(params).await?; Ok(near_jsonrpc_primitives::types::blocks::RpcBlockResponse { block_view }) } else { block_call(data, Params(params)).await @@ -100,7 +98,7 @@ pub async fn changes_in_block_by_type( // genesis sync_checkpoint or earliest_available sync_checkpoint // and proxy to near-rpc crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } near_primitives::types::BlockReference::Finality(finality) => { if finality != &near_primitives::types::Finality::Final { @@ -108,7 +106,7 @@ pub async fn changes_in_block_by_type( // optimistic finality or doom_slug finality // and proxy to near-rpc crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } else { changes_in_block_by_type_call(data, Params(params)).await } @@ -138,7 +136,7 @@ pub async fn changes_in_block( // genesis sync_checkpoint or earliest_available sync_checkpoint // and proxy to near-rpc crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } near_primitives::types::BlockReference::Finality(finality) => { if finality != &near_primitives::types::Finality::Final { @@ -146,7 +144,7 @@ pub async fn changes_in_block( // optimistic finality or doom_slug finality // and proxy to near-rpc crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } else { changes_in_block_call(data, Params(params)).await } diff --git a/rpc-server/src/modules/clients/methods.rs b/rpc-server/src/modules/clients/methods.rs index cc3ded4f..17aa49d3 100644 --- a/rpc-server/src/modules/clients/methods.rs +++ b/rpc-server/src/modules/clients/methods.rs @@ -1,6 +1,5 @@ use crate::config::ServerContext; use crate::errors::RPCError; -use crate::utils::proxy_rpc_call; use jsonrpc_v2::{Data, Params}; pub async fn light_client_proof( @@ -12,7 +11,7 @@ pub async fn light_client_proof( near_jsonrpc_primitives::types::light_client::RpcLightClientExecutionProofResponse, RPCError, > { - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } pub async fn next_light_client_block( @@ -22,7 +21,7 @@ pub async fn next_light_client_block( >, ) -> Result { - match proxy_rpc_call(&data.near_rpc_client, params).await? { + match data.near_rpc_client.call(params).await? { Some(light_client_block) => Ok( near_jsonrpc_primitives::types::light_client::RpcLightClientNextBlockResponse { light_client_block: Some(std::sync::Arc::new(light_client_block)), diff --git a/rpc-server/src/modules/gas/methods.rs b/rpc-server/src/modules/gas/methods.rs index 6d808fb3..0bde82a3 100644 --- a/rpc-server/src/modules/gas/methods.rs +++ b/rpc-server/src/modules/gas/methods.rs @@ -1,12 +1,11 @@ use crate::config::ServerContext; use crate::errors::RPCError; -use crate::utils::proxy_rpc_call; use jsonrpc_v2::{Data, Params}; pub async fn gas_price( data: Data, Params(params): Params, ) -> Result { - let gas_price_view = proxy_rpc_call(&data.near_rpc_client, params).await?; + let gas_price_view = data.near_rpc_client.call(params).await?; Ok(near_jsonrpc_primitives::types::gas_price::RpcGasPriceResponse { gas_price_view }) } diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index f35554cb..f1ea56b1 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -3,7 +3,6 @@ use crate::errors::RPCError; use crate::modules::network::{ friendly_memory_size_format, parse_validator_request, StatusResponse, }; -use crate::utils::proxy_rpc_call; use jsonrpc_v2::{Data, Params}; use serde_json::Value; use sysinfo::{System, SystemExt}; @@ -66,7 +65,7 @@ pub async fn validators( ) -> Result { match parse_validator_request(params).await { Ok(request) => { - let validator_info = proxy_rpc_call(&data.near_rpc_client, request).await?; + let validator_info = data.near_rpc_client.call(request).await?; Ok(near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info }) } Err(err) => Err(RPCError::parse_error(&err.to_string())), @@ -77,7 +76,7 @@ pub async fn validators_ordered( data: Data, Params(params): Params, ) -> Result { - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } pub async fn genesis_config( @@ -91,6 +90,6 @@ pub async fn protocol_config( data: Data, Params(params): Params, ) -> Result { - let config_view = proxy_rpc_call(&data.near_rpc_client, params).await?; + let config_view = data.near_rpc_client.call(params).await?; Ok(near_jsonrpc_primitives::types::config::RpcProtocolConfigResponse { config_view }) } diff --git a/rpc-server/src/modules/queries/methods.rs b/rpc-server/src/modules/queries/methods.rs index d86c8d74..d36ee03f 100644 --- a/rpc-server/src/modules/queries/methods.rs +++ b/rpc-server/src/modules/queries/methods.rs @@ -5,7 +5,6 @@ use crate::modules::blocks::CacheBlock; #[cfg(feature = "account_access_keys")] use crate::modules::queries::utils::fetch_list_access_keys_from_db; use crate::modules::queries::utils::{fetch_state_from_db, run_contract}; -use crate::utils::proxy_rpc_call; #[cfg(feature = "shadow_data_consistency")] use crate::utils::shadow_compare_results; use jsonrpc_v2::{Data, Params}; @@ -26,7 +25,7 @@ pub async fn query( // genesis sync_checkpoint or earliest_available sync_checkpoint // and proxy to near-rpc crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } near_primitives::types::BlockReference::Finality(finality) => { if finality != &near_primitives::types::Finality::Final { @@ -34,7 +33,7 @@ pub async fn query( // optimistic finality or doom_slug finality // and proxy to near-rpc crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - Ok(proxy_rpc_call(&data.near_rpc_client, params).await?) + Ok(data.near_rpc_client.call(params).await?) } else { query_call(data, Params(params)).await } @@ -94,7 +93,7 @@ async fn query_call( near_primitives::views::QueryRequest::ViewAccessKeyList { account_id } => { crate::metrics::QUERY_VIEW_ACCESS_KEYS_LIST_REQUESTS_TOTAL.inc(); #[cfg(not(feature = "account_access_keys"))] - return Ok(crate::utils::proxy_rpc_call(&data.near_rpc_client, params).await?); + return Ok(data.near_rpc_client.call(params).await?); #[cfg(feature = "account_access_keys")] { view_access_keys_list(&data, block, &account_id).await diff --git a/rpc-server/src/modules/transactions/methods.rs b/rpc-server/src/modules/transactions/methods.rs index 129b40e2..ae9daf2d 100644 --- a/rpc-server/src/modules/transactions/methods.rs +++ b/rpc-server/src/modules/transactions/methods.rs @@ -3,7 +3,6 @@ use crate::errors::RPCError; use crate::modules::transactions::{ parse_signed_transaction, parse_transaction_status_common_request, }; -use crate::utils::proxy_rpc_call; #[cfg(feature = "shadow_data_consistency")] use crate::utils::shadow_compare_results; use jsonrpc_v2::{Data, Params}; @@ -128,7 +127,7 @@ pub async fn send_tx_async( near_jsonrpc_client::methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest { signed_transaction, }; - match proxy_rpc_call(&data.near_rpc_client, proxy_params).await { + match data.near_rpc_client.call(proxy_params).await { Ok(resp) => Ok(resp), Err(err) => Err(RPCError::internal_error(&err.to_string())), } @@ -154,7 +153,7 @@ pub async fn send_tx_commit( near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest { signed_transaction, }; - match proxy_rpc_call(&data.near_rpc_client, proxy_params).await { + match data.near_rpc_client.call(proxy_params).await { Ok(resp) => Ok( near_jsonrpc_primitives::types::transactions::RpcTransactionResponse { final_execution_outcome: FinalExecutionOutcome(resp), diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index f17ec9e1..9677e292 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -7,21 +7,76 @@ use sysinfo::{System, SystemExt}; #[cfg(feature = "shadow_data_consistency")] const DEFAULT_RETRY_COUNT: u8 = 3; -#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(params)))] -pub async fn proxy_rpc_call( - client: &near_jsonrpc_client::JsonRpcClient, - params: M, -) -> near_jsonrpc_client::MethodCallResult -where - M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug, -{ - tracing::debug!("PROXY call. {:?}", params); - client.call(params).await +#[derive(Clone)] +pub struct JsonRpcClient { + regular_client: near_jsonrpc_client::JsonRpcClient, + archive_client: near_jsonrpc_client::JsonRpcClient, +} + +impl JsonRpcClient { + pub fn new(rpc_url: http::Uri, archive_rpc_url: Option) -> Self { + let regular_client = near_jsonrpc_client::JsonRpcClient::connect(rpc_url.to_string()); + let archive_client = match archive_rpc_url { + Some(archive_rpc_url) => { + near_jsonrpc_client::JsonRpcClient::connect(archive_rpc_url.to_string()) + } + None => regular_client.clone(), + }; + Self { + regular_client, + archive_client, + } + } + + pub fn header( + mut self, + header_name: &'static str, + header_value: &'static str, + ) -> anyhow::Result { + self.regular_client = self.regular_client.header((header_name, header_value))?; + self.archive_client = self.archive_client.header((header_name, header_value))?; + Ok(self) + } + + async fn rpc_call( + &self, + params: M, + is_archive: bool, + ) -> near_jsonrpc_client::MethodCallResult + where + M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug, + { + if is_archive { + self.archive_client.call(params).await + } else { + self.regular_client.call(params).await + } + } + + pub async fn call( + &self, + params: M, + ) -> near_jsonrpc_client::MethodCallResult + where + M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug, + { + tracing::debug!("PROXY call. {:?}", params); + self.rpc_call(params, false).await + } + + pub async fn archive_call( + &self, + params: M, + ) -> near_jsonrpc_client::MethodCallResult + where + M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug, + { + tracing::debug!("ARCHIVAL PROXY call. {:?}", params); + self.rpc_call(params, true).await + } } -pub async fn get_final_cache_block( - near_rpc_client: &near_jsonrpc_client::JsonRpcClient, -) -> Option { +pub async fn get_final_cache_block(near_rpc_client: &JsonRpcClient) -> Option { let block_request_method = near_jsonrpc_client::methods::block::RpcBlockRequest { block_reference: near_primitives::types::BlockReference::Finality( near_primitives::types::Finality::Final, @@ -146,7 +201,7 @@ pub(crate) async fn gigabytes_to_bytes(gigabytes: f64) -> usize { /// The function takes three arguments: /// /// `readrpc_response`: a `Result` object representing the results from Read RPC. -/// `client`: `near_jsonrpc_client::JsonRpcClient`. +/// `client`: `JsonRpcClient`. /// `params`: `near_jsonrpc_client::methods::RpcMethod` trait. /// /// In case of a successful comparison, the function returns `Ok(())`. @@ -154,7 +209,7 @@ pub(crate) async fn gigabytes_to_bytes(gigabytes: f64) -> usize { #[cfg(feature = "shadow_data_consistency")] pub async fn shadow_compare_results( read_rpc_response: Result, - client: near_jsonrpc_client::JsonRpcClient, + client: JsonRpcClient, params: M, read_rpc_response_is_ok: bool, ) -> Result<(), ShadowDataConsistencyError> @@ -171,7 +226,7 @@ where } }; - let mut near_rpc_response = client.call(¶ms).await; + let mut near_rpc_response = client.archive_call(¶ms).await; for _ in 0..DEFAULT_RETRY_COUNT { if let Err(json_rpc_err) = &near_rpc_response { @@ -188,7 +243,7 @@ where } }; if retry { - near_rpc_response = client.call(¶ms).await; + near_rpc_response = client.archive_call(¶ms).await; } else { break; }