Skip to content

Commit

Permalink
feat: Require NEAR regular RPC URL additionally in configs
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Nov 20, 2023
1 parent 0a68b52 commit 240c881
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 46 deletions.
8 changes: 6 additions & 2 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pub struct Opts {
#[clap(long, env = "NEAR_RPC_URL")]
pub rpc_url: http::Uri,

// near network archive rpc url
#[clap(long, env = "ARCHIVE_NEAR_RPC_URL")]
pub archive_rpc_url: Option<http::Uri>,

// Indexer bucket name
#[clap(long, env = "AWS_BUCKET_NAME")]
pub s3_bucket_name: String,
Expand Down Expand Up @@ -140,7 +144,7 @@ impl Opts {
pub struct ServerContext {
pub s3_client: near_lake_framework::s3_fetchers::LakeS3Client,
pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
pub near_rpc_client: near_jsonrpc_client::JsonRpcClient,
pub near_rpc_client: crate::utils::JsonRpcClient,
pub s3_bucket_name: String,
pub genesis_config: near_chain_configs::GenesisConfig,
pub blocks_cache:
Expand All @@ -158,7 +162,7 @@ impl ServerContext {
pub fn new(
s3_client: near_lake_framework::s3_fetchers::LakeS3Client,
db_manager: impl database::ReaderDbManager + Sync + Send + 'static,
near_rpc_client: near_jsonrpc_client::JsonRpcClient,
near_rpc_client: crate::utils::JsonRpcClient,
s3_bucket_name: String,
genesis_config: near_chain_configs::GenesisConfig,
blocks_cache: std::sync::Arc<
Expand Down
7 changes: 4 additions & 3 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ async fn main() -> anyhow::Result<()> {
#[cfg(not(feature = "tracing-instrumentation"))]
init_logging(false)?;

let near_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url.to_string());
let near_rpc_client =
utils::JsonRpcClient::new(opts.rpc_url.clone(), opts.archive_rpc_url.clone());
// We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance
let near_rpc_client = near_rpc_client.header(("Referer", "read-rpc"))?; // TODO: make it configurable
let near_rpc_client = near_rpc_client.header("Referer", "read-rpc")?; // TODO: make it configurable

let final_block = get_final_cache_block(&near_rpc_client)
.await
Expand Down Expand Up @@ -149,7 +150,7 @@ async fn main() -> anyhow::Result<()> {
s3_config,
)),
db_manager,
near_rpc_client.clone(),
near_rpc_client,
opts.s3_bucket_name.clone(),
genesis_config,
std::sync::Arc::clone(&blocks_cache),
Expand Down
14 changes: 6 additions & 8 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::modules::blocks::utils::{
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use jsonrpc_v2::{Data, Params};

use crate::utils::proxy_rpc_call;
use near_primitives::trie_key::TrieKey;
use near_primitives::views::StateChangeValueView;

Expand All @@ -27,7 +25,7 @@ pub async fn block(
// genesis sync_checkpoint or earliest_available sync_checkpoint
// and proxy to near-rpc
crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc();
let block_view = proxy_rpc_call(&data.near_rpc_client, params).await?;
let block_view = data.near_rpc_client.call(params).await?;
Ok(near_jsonrpc_primitives::types::blocks::RpcBlockResponse { block_view })
}
near_primitives::types::BlockReference::Finality(finality) => {
Expand All @@ -36,7 +34,7 @@ pub async fn block(
// genesis sync_checkpoint or earliest_available sync_checkpoint
// and proxy to near-rpc
crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc();
let block_view = proxy_rpc_call(&data.near_rpc_client, params).await?;
let block_view = data.near_rpc_client.call(params).await?;
Ok(near_jsonrpc_primitives::types::blocks::RpcBlockResponse { block_view })
} else {
block_call(data, Params(params)).await
Expand Down Expand Up @@ -100,15 +98,15 @@ pub async fn changes_in_block_by_type(
// genesis sync_checkpoint or earliest_available sync_checkpoint
// and proxy to near-rpc
crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
}
near_primitives::types::BlockReference::Finality(finality) => {
if finality != &near_primitives::types::Finality::Final {
// Increase the OPTIMISTIC_REQUESTS_TOTAL metric if the request has
// optimistic finality or doom_slug finality
// and proxy to near-rpc
crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
} else {
changes_in_block_by_type_call(data, Params(params)).await
}
Expand Down Expand Up @@ -138,15 +136,15 @@ pub async fn changes_in_block(
// genesis sync_checkpoint or earliest_available sync_checkpoint
// and proxy to near-rpc
crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
}
near_primitives::types::BlockReference::Finality(finality) => {
if finality != &near_primitives::types::Finality::Final {
// Increase the OPTIMISTIC_REQUESTS_TOTAL metric if the request has
// optimistic finality or doom_slug finality
// and proxy to near-rpc
crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
} else {
changes_in_block_call(data, Params(params)).await
}
Expand Down
5 changes: 2 additions & 3 deletions rpc-server/src/modules/clients/methods.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::config::ServerContext;
use crate::errors::RPCError;
use crate::utils::proxy_rpc_call;
use jsonrpc_v2::{Data, Params};

pub async fn light_client_proof(
Expand All @@ -12,7 +11,7 @@ pub async fn light_client_proof(
near_jsonrpc_primitives::types::light_client::RpcLightClientExecutionProofResponse,
RPCError,
> {
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
}

pub async fn next_light_client_block(
Expand All @@ -22,7 +21,7 @@ pub async fn next_light_client_block(
>,
) -> Result<near_jsonrpc_primitives::types::light_client::RpcLightClientNextBlockResponse, RPCError>
{
match proxy_rpc_call(&data.near_rpc_client, params).await? {
match data.near_rpc_client.call(params).await? {
Some(light_client_block) => Ok(
near_jsonrpc_primitives::types::light_client::RpcLightClientNextBlockResponse {
light_client_block: Some(std::sync::Arc::new(light_client_block)),
Expand Down
3 changes: 1 addition & 2 deletions rpc-server/src/modules/gas/methods.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::config::ServerContext;
use crate::errors::RPCError;
use crate::utils::proxy_rpc_call;
use jsonrpc_v2::{Data, Params};

pub async fn gas_price(
data: Data<ServerContext>,
Params(params): Params<near_jsonrpc_primitives::types::gas_price::RpcGasPriceRequest>,
) -> Result<near_jsonrpc_primitives::types::gas_price::RpcGasPriceResponse, RPCError> {
let gas_price_view = proxy_rpc_call(&data.near_rpc_client, params).await?;
let gas_price_view = data.near_rpc_client.call(params).await?;
Ok(near_jsonrpc_primitives::types::gas_price::RpcGasPriceResponse { gas_price_view })
}
7 changes: 3 additions & 4 deletions rpc-server/src/modules/network/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::errors::RPCError;
use crate::modules::network::{
friendly_memory_size_format, parse_validator_request, StatusResponse,
};
use crate::utils::proxy_rpc_call;
use jsonrpc_v2::{Data, Params};
use serde_json::Value;
use sysinfo::{System, SystemExt};
Expand Down Expand Up @@ -66,7 +65,7 @@ pub async fn validators(
) -> Result<near_jsonrpc_primitives::types::validator::RpcValidatorResponse, RPCError> {
match parse_validator_request(params).await {
Ok(request) => {
let validator_info = proxy_rpc_call(&data.near_rpc_client, request).await?;
let validator_info = data.near_rpc_client.call(request).await?;
Ok(near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info })
}
Err(err) => Err(RPCError::parse_error(&err.to_string())),
Expand All @@ -77,7 +76,7 @@ pub async fn validators_ordered(
data: Data<ServerContext>,
Params(params): Params<near_jsonrpc_primitives::types::validator::RpcValidatorsOrderedRequest>,
) -> Result<near_jsonrpc_primitives::types::validator::RpcValidatorsOrderedResponse, RPCError> {
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
}

pub async fn genesis_config(
Expand All @@ -91,6 +90,6 @@ pub async fn protocol_config(
data: Data<ServerContext>,
Params(params): Params<near_jsonrpc_primitives::types::config::RpcProtocolConfigRequest>,
) -> Result<near_jsonrpc_primitives::types::config::RpcProtocolConfigResponse, RPCError> {
let config_view = proxy_rpc_call(&data.near_rpc_client, params).await?;
let config_view = data.near_rpc_client.call(params).await?;
Ok(near_jsonrpc_primitives::types::config::RpcProtocolConfigResponse { config_view })
}
7 changes: 3 additions & 4 deletions rpc-server/src/modules/queries/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::modules::blocks::CacheBlock;
#[cfg(feature = "account_access_keys")]
use crate::modules::queries::utils::fetch_list_access_keys_from_db;
use crate::modules::queries::utils::{fetch_state_from_db, run_contract};
use crate::utils::proxy_rpc_call;
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use jsonrpc_v2::{Data, Params};
Expand All @@ -26,15 +25,15 @@ pub async fn query(
// genesis sync_checkpoint or earliest_available sync_checkpoint
// and proxy to near-rpc
crate::metrics::SYNC_CHECKPOINT_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
}
near_primitives::types::BlockReference::Finality(finality) => {
if finality != &near_primitives::types::Finality::Final {
// Increase the OPTIMISTIC_REQUESTS_TOTAL metric if the request has
// optimistic finality or doom_slug finality
// and proxy to near-rpc
crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc();
Ok(proxy_rpc_call(&data.near_rpc_client, params).await?)
Ok(data.near_rpc_client.call(params).await?)
} else {
query_call(data, Params(params)).await
}
Expand Down Expand Up @@ -94,7 +93,7 @@ async fn query_call(
near_primitives::views::QueryRequest::ViewAccessKeyList { account_id } => {
crate::metrics::QUERY_VIEW_ACCESS_KEYS_LIST_REQUESTS_TOTAL.inc();
#[cfg(not(feature = "account_access_keys"))]
return Ok(crate::utils::proxy_rpc_call(&data.near_rpc_client, params).await?);
return Ok(data.near_rpc_client.call(params).await?);
#[cfg(feature = "account_access_keys")]
{
view_access_keys_list(&data, block, &account_id).await
Expand Down
5 changes: 2 additions & 3 deletions rpc-server/src/modules/transactions/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::errors::RPCError;
use crate::modules::transactions::{
parse_signed_transaction, parse_transaction_status_common_request,
};
use crate::utils::proxy_rpc_call;
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use jsonrpc_v2::{Data, Params};
Expand Down Expand Up @@ -128,7 +127,7 @@ pub async fn send_tx_async(
near_jsonrpc_client::methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest {
signed_transaction,
};
match proxy_rpc_call(&data.near_rpc_client, proxy_params).await {
match data.near_rpc_client.call(proxy_params).await {
Ok(resp) => Ok(resp),
Err(err) => Err(RPCError::internal_error(&err.to_string())),
}
Expand All @@ -154,7 +153,7 @@ pub async fn send_tx_commit(
near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest {
signed_transaction,
};
match proxy_rpc_call(&data.near_rpc_client, proxy_params).await {
match data.near_rpc_client.call(proxy_params).await {
Ok(resp) => Ok(
near_jsonrpc_primitives::types::transactions::RpcTransactionResponse {
final_execution_outcome: FinalExecutionOutcome(resp),
Expand Down
89 changes: 72 additions & 17 deletions rpc-server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,76 @@ use sysinfo::{System, SystemExt};
#[cfg(feature = "shadow_data_consistency")]
const DEFAULT_RETRY_COUNT: u8 = 3;

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(params)))]
pub async fn proxy_rpc_call<M>(
client: &near_jsonrpc_client::JsonRpcClient,
params: M,
) -> near_jsonrpc_client::MethodCallResult<M::Response, M::Error>
where
M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug,
{
tracing::debug!("PROXY call. {:?}", params);
client.call(params).await
#[derive(Clone)]
pub struct JsonRpcClient {
regular_client: near_jsonrpc_client::JsonRpcClient,
archive_client: near_jsonrpc_client::JsonRpcClient,
}

impl JsonRpcClient {
pub fn new(rpc_url: http::Uri, archive_rpc_url: Option<http::Uri>) -> Self {
let regular_client = near_jsonrpc_client::JsonRpcClient::connect(rpc_url.to_string());
let archive_client = match archive_rpc_url {
Some(archive_rpc_url) => {
near_jsonrpc_client::JsonRpcClient::connect(archive_rpc_url.to_string())
}
None => regular_client.clone(),
};
Self {
regular_client,
archive_client,
}
}

pub fn header(
mut self,
header_name: &'static str,
header_value: &'static str,
) -> anyhow::Result<Self> {
self.regular_client = self.regular_client.header((header_name, header_value))?;
self.archive_client = self.archive_client.header((header_name, header_value))?;
Ok(self)
}

async fn rpc_call<M>(
&self,
params: M,
is_archive: bool,
) -> near_jsonrpc_client::MethodCallResult<M::Response, M::Error>
where
M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug,
{
if is_archive {
self.archive_client.call(params).await
} else {
self.regular_client.call(params).await
}
}

pub async fn call<M>(
&self,
params: M,
) -> near_jsonrpc_client::MethodCallResult<M::Response, M::Error>
where
M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug,
{
tracing::debug!("PROXY call. {:?}", params);
self.rpc_call(params, false).await
}

pub async fn archive_call<M>(
&self,
params: M,
) -> near_jsonrpc_client::MethodCallResult<M::Response, M::Error>
where
M: near_jsonrpc_client::methods::RpcMethod + std::fmt::Debug,
{
tracing::debug!("ARCHIVAL PROXY call. {:?}", params);
self.rpc_call(params, true).await
}
}

pub async fn get_final_cache_block(
near_rpc_client: &near_jsonrpc_client::JsonRpcClient,
) -> Option<CacheBlock> {
pub async fn get_final_cache_block(near_rpc_client: &JsonRpcClient) -> Option<CacheBlock> {
let block_request_method = near_jsonrpc_client::methods::block::RpcBlockRequest {
block_reference: near_primitives::types::BlockReference::Finality(
near_primitives::types::Finality::Final,
Expand Down Expand Up @@ -146,15 +201,15 @@ pub(crate) async fn gigabytes_to_bytes(gigabytes: f64) -> usize {
/// The function takes three arguments:
///
/// `readrpc_response`: a `Result<serde_json::Value, serde_json::Error>` object representing the results from Read RPC.
/// `client`: `near_jsonrpc_client::JsonRpcClient`.
/// `client`: `JsonRpcClient`.
/// `params`: `near_jsonrpc_client::methods::RpcMethod` trait.
///
/// In case of a successful comparison, the function returns `Ok(())`.
/// Otherwise, it returns `Err(ShadowDataConsistencyError)`.
#[cfg(feature = "shadow_data_consistency")]
pub async fn shadow_compare_results<M>(
read_rpc_response: Result<serde_json::Value, serde_json::Error>,
client: near_jsonrpc_client::JsonRpcClient,
client: JsonRpcClient,
params: M,
read_rpc_response_is_ok: bool,
) -> Result<(), ShadowDataConsistencyError>
Expand All @@ -171,7 +226,7 @@ where
}
};

let mut near_rpc_response = client.call(&params).await;
let mut near_rpc_response = client.archive_call(&params).await;

for _ in 0..DEFAULT_RETRY_COUNT {
if let Err(json_rpc_err) = &near_rpc_response {
Expand All @@ -188,7 +243,7 @@ where
}
};
if retry {
near_rpc_response = client.call(&params).await;
near_rpc_response = client.archive_call(&params).await;
} else {
break;
}
Expand Down

0 comments on commit 240c881

Please sign in to comment.