From 595b025030d380676973f8268ffe5342d84c85fa Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 6 Nov 2024 14:11:36 +0200 Subject: [PATCH 1/4] Corrected state size calculation logic (#371) * fix to calculate state size * clippy * fix calculate state size limit * changelog --- CHANGELOG.md | 11 +++++++++ rpc-server/src/config.rs | 4 ++++ rpc-server/src/metrics.rs | 10 +++++++- rpc-server/src/modules/blocks/mod.rs | 9 ------- .../modules/queries/contract_runner/mod.rs | 24 ++++++++++++++++--- rpc-server/src/modules/queries/methods.rs | 14 ++++++++++- 6 files changed, 58 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bad5c2fc..a0e51955 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/read-rpc/compare/main...develop) +### What's Changed +* Corrected state size calculation logic. +* Integrated cargo_pkg_version metric to reflect the current server version. +* Delete unnecessary debug logs about update blocks by finalities + +## [0.3.1](https://github.com/near/read-rpc/releases/tag/v0.3.1) + +### Supported Nearcore Version +- nearcore v2.3.0 +- rust v1.81.0 + ### What's Changed * Improved bulk insertion of state_changes, reducing database requests from hundreds to a maximum of 7 per block. * Configuration improvement. Create default config.toml on start application to loaded parameters from the environment variables. diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index f3271afc..65dacdab 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -153,6 +153,10 @@ impl ServerContext { let compiled_contract_code_cache = std::sync::Arc::new(CompiledCodeCache::new(contract_code_cache_size_in_bytes)); + crate::metrics::CARGO_PKG_VERSION + .with_label_values(&[NEARD_VERSION]) + .inc(); + Ok(Self { s3_client, db_manager: std::sync::Arc::new(Box::new(db_manager)), diff --git a/rpc-server/src/metrics.rs b/rpc-server/src/metrics.rs index 206f3509..745b90a4 100644 --- a/rpc-server/src/metrics.rs +++ b/rpc-server/src/metrics.rs @@ -1,5 +1,5 @@ use actix_web::{get, Responder}; -use prometheus::{Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts}; +use prometheus::{CounterVec, Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts}; type Result = std::result::Result; @@ -113,6 +113,14 @@ lazy_static! { "Optimistic updating status. 0: working, 1: not working", ).unwrap(); + pub(crate) static ref CARGO_PKG_VERSION: CounterVec = { + let opts = Opts::new("cargo_pkg_version", "Cargo package version. This is used to track the version of the running server.") + .variable_label("version"); + let counter_vec = CounterVec::new(opts, &["version"]).expect("metric can be created"); + prometheus::register(Box::new(counter_vec.clone())).unwrap(); + counter_vec + }; + pub(crate) static ref LEGACY_DATABASE_TX_DETAILS: IntCounterVec = register_int_counter_vec( "legacy_database_tx_details", "Total number of calls to the legacy database for transaction details", diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 7fb9e714..e4fac836 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -335,10 +335,6 @@ impl BlocksInfoByFinality { // Update final block info in the cache. // Executes every second. pub async fn update_final_block(&self, block_info: BlockInfo) { - tracing::debug!( - "Update final block info: {:?}", - block_info.block_cache.block_height - ); let mut final_block_lock = self.final_block.write().await; final_block_lock.block_cache = block_info.block_cache; final_block_lock.block_view = block_info.block_view; @@ -348,11 +344,6 @@ impl BlocksInfoByFinality { // Update optimistic block changes and optimistic block info in the cache. // Executes every second. pub async fn update_optimistic_block(&self, block_info: BlockInfo) { - tracing::debug!( - "Update optimistic block info: {:?}", - block_info.block_cache.block_height - ); - let mut optimistic_changes_lock = self.optimistic_changes.write().await; optimistic_changes_lock.account_changes = block_info.changes_in_block_account_map().await; diff --git a/rpc-server/src/modules/queries/contract_runner/mod.rs b/rpc-server/src/modules/queries/contract_runner/mod.rs index b21062d3..470e4f03 100644 --- a/rpc-server/src/modules/queries/contract_runner/mod.rs +++ b/rpc-server/src/modules/queries/contract_runner/mod.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; -use near_vm_runner::ContractRuntimeCache; - use crate::modules::blocks::BlocksInfoByFinality; use code_storage::CodeStorage; +use near_vm_runner::ContractRuntimeCache; mod code_storage; @@ -137,12 +136,31 @@ pub async fn run_contract( block_hash: block.block_hash, } })?; + println!("Contract code len {}", code.data.len()); contract_code_cache.put(code_hash, code.data.clone()).await; Contract::new(Some(code.data), code_hash) } } }; + // We need to calculate the state size of the contract to determine if we should prefetch the state or not. + // The state size is the storage usage minus the code size. + // If the state size is less than the prefetch_state_size_limit, we prefetch the state. + let code_len = if let Some(contract_code) = &contract_code.contract_code { + contract_code.code().len() + } else if let Some(code) = contract_code_cache.get(&code_hash).await { + code.len() + } else { + db_manager + .get_contract_code(account_id, block.block_height, "query_call_function") + .await + .map(|code| code.data.len()) + .unwrap_or_default() + }; + let state_size = contract + .data + .storage_usage() + .saturating_sub(code_len as u64); // Init an external database interface for the Runtime logic let code_storage = CodeStorage::init( db_manager.clone(), @@ -150,7 +168,7 @@ pub async fn run_contract( block.block_height, validators, optimistic_data, - contract.data.storage_usage() <= prefetch_state_size_limit, + state_size <= prefetch_state_size_limit, ) .await; diff --git a/rpc-server/src/modules/queries/methods.rs b/rpc-server/src/modules/queries/methods.rs index 3d628b48..92d85d6b 100644 --- a/rpc-server/src/modules/queries/methods.rs +++ b/rpc-server/src/modules/queries/methods.rs @@ -443,7 +443,19 @@ async fn view_state( block_hash: block.block_hash, }, )?; - if prefix.is_empty() && account.data.storage_usage() > data.prefetch_state_size_limit { + + // Calculate the state size excluding the contract code size to check if it's too large to fetch. + // The state size is the storage usage minus the code size. + // more details: nearcore/runtime/runtime/src/state_viewer/mod.rs:150 + let code_len = data + .db_manager + .get_contract_code(account_id, block.block_height, "query_view_state") + .await + .map(|code| code.data.len() as u64) + .unwrap_or_default(); + let state_size = account.data.storage_usage().saturating_sub(code_len); + // If the prefix is empty and the state size is larger than the limit, return an error. + if prefix.is_empty() && state_size > data.prefetch_state_size_limit { return Err( near_jsonrpc::primitives::types::query::RpcQueryError::TooLargeContractState { contract_account_id: account_id.clone(), From dd6b2fd6f87869ecb4123c05e7aa4482d56793ff Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Fri, 15 Nov 2024 10:32:52 +0200 Subject: [PATCH 2/4] Support new method EXPERIMENTAL_congestion_level (#376) --- rpc-server/src/main.rs | 6 +++ rpc-server/src/modules/blocks/methods.rs | 63 +++++++++++++++++++++-- rpc-server/src/modules/network/methods.rs | 25 +++++---- 3 files changed, 79 insertions(+), 15 deletions(-) diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 848fb32e..11d869d8 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -176,6 +176,12 @@ async fn rpc_handler( }) .await } + "EXPERIMENTAL_congestion_level" => { + process_method_call(request, |params| { + modules::blocks::methods::congestion_level(data, params) + }) + .await + } "EXPERIMENTAL_genesis_config" => { process_method_call(request, |_: ()| { modules::network::methods::genesis_config(data) diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index ae355c9c..85e7e63b 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -52,7 +52,7 @@ pub async fn chunk( near_jsonrpc::primitives::types::chunks::RpcChunkError, > { tracing::debug!("`chunk` called with parameters: {:?}", request_data); - let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone()).await; + let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone(), "chunk").await; #[cfg(feature = "shadow-data-consistency")] { crate::utils::shadow_compare_results_handler( @@ -67,6 +67,58 @@ pub async fn chunk( chunk_result } +// EXPERIMENTAL_congestion_level rpc method implementation +#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] +pub async fn congestion_level( + data: Data, + request_data: near_jsonrpc::primitives::types::congestion::RpcCongestionLevelRequest, +) -> Result< + near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse, + near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError, +> { + tracing::debug!( + "`EXPERIMENTAL_congestion_level` called with parameters: {:?}", + request_data + ); + let chunk_view = fetch_chunk( + &data, + request_data.chunk_reference.clone(), + "EXPERIMENTAL_congestion_level", + ) + .await? + .chunk_view; + let config_result = crate::modules::network::methods::protocol_config_call( + &data, + near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height( + chunk_view.header.height_included, + )), + "EXPERIMENTAL_congestion_level", + ) + .await; + let config = config_result.map_err(|err: near_jsonrpc::primitives::types::config::RpcProtocolConfigError| match err { + near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { error_message } => { + near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::UnknownBlock { + error_message, + } + } + near_jsonrpc::primitives::types::config::RpcProtocolConfigError::InternalError { error_message } => { + near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::InternalError { + error_message, + } + } + })?; + let congestion_level = chunk_view + .header + .congestion_info + .map(|info| info.congestion_level(config.runtime_config.congestion_control_config)) + .unwrap_or(0.0); + Ok( + near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse { + congestion_level, + }, + ) +} + /// `EXPERIMENTAL_changes` rpc method implementation /// calls proxy_rpc_call to get `EXPERIMENTAL_changes` from near-rpc if request parameters not supported by read-rpc /// as example: BlockReference for Finality::None is not supported by read-rpc @@ -379,6 +431,7 @@ pub async fn fetch_block( pub async fn fetch_chunk( data: &Data, chunk_reference: near_jsonrpc::primitives::types::chunks::ChunkReference, + method_name: &str, ) -> Result< near_jsonrpc::primitives::types::chunks::RpcChunkResponse, near_jsonrpc::primitives::types::chunks::RpcChunkError, @@ -400,7 +453,7 @@ pub async fn fetch_chunk( } near_primitives::types::BlockId::Hash(block_hash) => data .db_manager - .get_block_height_by_hash(block_hash, "chunk") + .get_block_height_by_hash(block_hash, method_name) .await .map_err(|err| { tracing::error!("Failed to fetch block by hash: {}", err); @@ -412,7 +465,7 @@ pub async fn fetch_chunk( // Check if the chunk stored in block with the given height if let Ok(block_height_shard_id) = data .db_manager - .get_block_by_height_and_shard_id(block_height, shard_id, "chunk") + .get_block_by_height_and_shard_id(block_height, shard_id, method_name) .await { (block_height_shard_id.0, block_height_shard_id.1) @@ -422,7 +475,7 @@ pub async fn fetch_chunk( } near_jsonrpc::primitives::types::chunks::ChunkReference::ChunkHash { chunk_id } => data .db_manager - .get_block_by_chunk_hash(chunk_id, "chunk") + .get_block_by_chunk_hash(chunk_id, method_name) .await .map_err( |_err| near_jsonrpc::primitives::types::chunks::RpcChunkError::UnknownChunk { @@ -444,7 +497,7 @@ pub async fn fetch_chunk( &near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height( block_height, )), - "chunk", + method_name, Some(block_height), ) .await; diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index 3e73e43f..218dd83a 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -297,7 +297,12 @@ pub async fn protocol_config( request_data ); - let config_view = protocol_config_call(&data, request_data.block_reference.clone()).await; + let config_view = protocol_config_call( + &data, + request_data.block_reference.clone(), + "EXPERIMENTAL_protocol_config", + ) + .await; #[cfg(feature = "shadow-data-consistency")] { @@ -366,21 +371,21 @@ async fn validators_call( Ok(validators.validators_info) } -async fn protocol_config_call( +pub async fn protocol_config_call( data: &Data, block_reference: near_primitives::types::BlockReference, + method_name: &str, ) -> Result< near_chain_configs::ProtocolConfigView, near_jsonrpc::primitives::types::config::RpcProtocolConfigError, > { - let protocol_version = - get_protocol_version(data, block_reference, "EXPERIMENTAL_protocol_config") - .await - .map_err(|err| { - near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { - error_message: err.to_string(), - } - })?; + let protocol_version = get_protocol_version(data, block_reference, method_name) + .await + .map_err(|err| { + near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { + error_message: err.to_string(), + } + })?; // Stores runtime config for each protocol version // Create store of runtime configs for the given chain id. // From 556a8644dd8e89765ed7565e6cd3154bfa1b594d Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Tue, 19 Nov 2024 15:23:46 +0200 Subject: [PATCH 3/4] remove println after dubug --- rpc-server/src/modules/queries/contract_runner/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rpc-server/src/modules/queries/contract_runner/mod.rs b/rpc-server/src/modules/queries/contract_runner/mod.rs index 470e4f03..8d41b6ac 100644 --- a/rpc-server/src/modules/queries/contract_runner/mod.rs +++ b/rpc-server/src/modules/queries/contract_runner/mod.rs @@ -136,7 +136,6 @@ pub async fn run_contract( block_hash: block.block_hash, } })?; - println!("Contract code len {}", code.data.len()); contract_code_cache.put(code_hash, code.data.clone()).await; Contract::new(Some(code.data), code_hash) } From 8ecf775c36714224a0a10eff5f5a18c941570710 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Fri, 22 Nov 2024 15:24:40 +0200 Subject: [PATCH 4/4] add metric to calculate database reads queries by account_id, method_name and table_name (#377) --- database/src/metrics.rs | 6 +++ database/src/postgres/rpc_server.rs | 64 +++++++++++++++++++++++++++++ rpc-server/src/metrics.rs | 7 ---- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/database/src/metrics.rs b/database/src/metrics.rs index 7ba79cb9..ea0adfa2 100644 --- a/database/src/metrics.rs +++ b/database/src/metrics.rs @@ -38,4 +38,10 @@ lazy_static! { &["method_name", "table_name"] ) .unwrap(); + pub(crate) static ref ACCOUTS_DATABASE_READ_QUERIES: IntCounterVec = register_int_counter_vec( + "account_database_read_queries_counter", + "Total number of accounts database reads queries by method_name and table_name", + &["account_id", "shard_id", "method_name", "table_name"] + ) + .unwrap(); } diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index ffe593b5..f6b06bcb 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -69,6 +69,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_data", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_data", + ]) + .inc(); let page_state = if let Some(page_state_token) = page_token { borsh::from_slice::(&hex::decode(page_state_token)?)? } else { @@ -150,6 +158,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_data", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_data", + ]) + .inc(); let mut items = std::collections::HashMap::new(); let mut stream = sqlx::query_as::<_, (String, Vec)>( " @@ -207,6 +223,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_data", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_data", + ]) + .inc(); let mut items = std::collections::HashMap::new(); let mut stream = sqlx::query_as::<_, (String, Vec)>( " @@ -264,6 +288,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_data", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_data", + ]) + .inc(); let (data_value,): (Vec,) = sqlx::query_as( " SELECT data_value @@ -297,6 +329,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_account", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_account", + ]) + .inc(); let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec) = sqlx::query_as( " @@ -334,6 +374,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_contract", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_contract", + ]) + .inc(); let (block_height, block_hash, contract_code): (bigdecimal::BigDecimal, String, Vec) = sqlx::query_as( " @@ -372,6 +420,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_access_key", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_access_key", + ]) + .inc(); let key_data = borsh::to_vec(&public_key)?; let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec) = sqlx::query_as( @@ -412,6 +468,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { "state_changes_access_key", ]) .inc(); + crate::metrics::ACCOUTS_DATABASE_READ_QUERIES + .with_label_values(&[ + account_id.as_ref(), + &shard_id_pool.shard_id.to_string(), + method_name, + "state_changes_access_key", + ]) + .inc(); let mut access_keys = vec![]; let mut stream = sqlx::query_as::<_, (String, Vec, bigdecimal::BigDecimal)>( " diff --git a/rpc-server/src/metrics.rs b/rpc-server/src/metrics.rs index 745b90a4..5af0d36e 100644 --- a/rpc-server/src/metrics.rs +++ b/rpc-server/src/metrics.rs @@ -121,13 +121,6 @@ lazy_static! { counter_vec }; - pub(crate) static ref LEGACY_DATABASE_TX_DETAILS: IntCounterVec = register_int_counter_vec( - "legacy_database_tx_details", - "Total number of calls to the legacy database for transaction details", - // This declares a label named `lookup_type` to differentiate "finished" and "in_progress" transaction lookups - &["lookup_type"] - ).unwrap(); - // Error metrics // 0: ReadRPC success, NEAR RPC success" // 1: ReadRPC success, NEAR RPC error"