Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick necessary changes and avoid to update nearcore #378

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/read-rpc/compare/main...develop)

### What's Changed
* Corrected state size calculation logic.
* Integrated cargo_pkg_version metric to reflect the current server version.
* Delete unnecessary debug logs about update blocks by finalities

## [0.3.1](https://github.com/near/read-rpc/releases/tag/v0.3.1)

### Supported Nearcore Version
- nearcore v2.3.0
- rust v1.81.0

### What's Changed
* Improved bulk insertion of state_changes, reducing database requests from hundreds to a maximum of 7 per block.
* Configuration improvement. Create default config.toml on start application to loaded parameters from the environment variables.
Expand Down
6 changes: 6 additions & 0 deletions database/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ lazy_static! {
&["method_name", "table_name"]
)
.unwrap();
pub(crate) static ref ACCOUTS_DATABASE_READ_QUERIES: IntCounterVec = register_int_counter_vec(
"account_database_read_queries_counter",
"Total number of accounts database reads queries by method_name and table_name",
&["account_id", "shard_id", "method_name", "table_name"]
)
.unwrap();
}
64 changes: 64 additions & 0 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let page_state = if let Some(page_state_token) = page_token {
borsh::from_slice::<crate::postgres::PageState>(&hex::decode(page_state_token)?)?
} else {
Expand Down Expand Up @@ -150,6 +158,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
"
Expand Down Expand Up @@ -207,6 +223,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
"
Expand Down Expand Up @@ -264,6 +288,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let (data_value,): (Vec<u8>,) = sqlx::query_as(
"
SELECT data_value
Expand Down Expand Up @@ -297,6 +329,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_account",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_account",
])
.inc();
let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
"
Expand Down Expand Up @@ -334,6 +374,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_contract",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_contract",
])
.inc();
let (block_height, block_hash, contract_code): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
"
Expand Down Expand Up @@ -372,6 +420,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_access_key",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_access_key",
])
.inc();
let key_data = borsh::to_vec(&public_key)?;
let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
Expand Down Expand Up @@ -412,6 +468,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_access_key",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_access_key",
])
.inc();
let mut access_keys = vec![];
let mut stream = sqlx::query_as::<_, (String, Vec<u8>, bigdecimal::BigDecimal)>(
"
Expand Down
4 changes: 4 additions & 0 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl ServerContext {
let compiled_contract_code_cache =
std::sync::Arc::new(CompiledCodeCache::new(contract_code_cache_size_in_bytes));

crate::metrics::CARGO_PKG_VERSION
.with_label_values(&[NEARD_VERSION])
.inc();

Ok(Self {
s3_client,
db_manager: std::sync::Arc::new(Box::new(db_manager)),
Expand Down
6 changes: 6 additions & 0 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ async fn rpc_handler(
})
.await
}
"EXPERIMENTAL_congestion_level" => {
process_method_call(request, |params| {
modules::blocks::methods::congestion_level(data, params)
})
.await
}
"EXPERIMENTAL_genesis_config" => {
process_method_call(request, |_: ()| {
modules::network::methods::genesis_config(data)
Expand Down
15 changes: 8 additions & 7 deletions rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_web::{get, Responder};
use prometheus::{Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts};
use prometheus::{CounterVec, Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts};

type Result<T, E> = std::result::Result<T, E>;

Expand Down Expand Up @@ -113,12 +113,13 @@ lazy_static! {
"Optimistic updating status. 0: working, 1: not working",
).unwrap();

pub(crate) static ref LEGACY_DATABASE_TX_DETAILS: IntCounterVec = register_int_counter_vec(
"legacy_database_tx_details",
"Total number of calls to the legacy database for transaction details",
// This declares a label named `lookup_type` to differentiate "finished" and "in_progress" transaction lookups
&["lookup_type"]
).unwrap();
pub(crate) static ref CARGO_PKG_VERSION: CounterVec = {
let opts = Opts::new("cargo_pkg_version", "Cargo package version. This is used to track the version of the running server.")
.variable_label("version");
let counter_vec = CounterVec::new(opts, &["version"]).expect("metric can be created");
prometheus::register(Box::new(counter_vec.clone())).unwrap();
counter_vec
};

// Error metrics
// 0: ReadRPC success, NEAR RPC success"
Expand Down
63 changes: 58 additions & 5 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn chunk(
near_jsonrpc::primitives::types::chunks::RpcChunkError,
> {
tracing::debug!("`chunk` called with parameters: {:?}", request_data);
let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone()).await;
let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone(), "chunk").await;
#[cfg(feature = "shadow-data-consistency")]
{
crate::utils::shadow_compare_results_handler(
Expand All @@ -67,6 +67,58 @@ pub async fn chunk(
chunk_result
}

// EXPERIMENTAL_congestion_level rpc method implementation
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))]
pub async fn congestion_level(
data: Data<ServerContext>,
request_data: near_jsonrpc::primitives::types::congestion::RpcCongestionLevelRequest,
) -> Result<
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse,
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError,
> {
tracing::debug!(
"`EXPERIMENTAL_congestion_level` called with parameters: {:?}",
request_data
);
let chunk_view = fetch_chunk(
&data,
request_data.chunk_reference.clone(),
"EXPERIMENTAL_congestion_level",
)
.await?
.chunk_view;
let config_result = crate::modules::network::methods::protocol_config_call(
&data,
near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
chunk_view.header.height_included,
)),
"EXPERIMENTAL_congestion_level",
)
.await;
let config = config_result.map_err(|err: near_jsonrpc::primitives::types::config::RpcProtocolConfigError| match err {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { error_message } => {
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::UnknownBlock {
error_message,
}
}
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::InternalError { error_message } => {
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::InternalError {
error_message,
}
}
})?;
let congestion_level = chunk_view
.header
.congestion_info
.map(|info| info.congestion_level(config.runtime_config.congestion_control_config))
.unwrap_or(0.0);
Ok(
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse {
congestion_level,
},
)
}

/// `EXPERIMENTAL_changes` rpc method implementation
/// calls proxy_rpc_call to get `EXPERIMENTAL_changes` from near-rpc if request parameters not supported by read-rpc
/// as example: BlockReference for Finality::None is not supported by read-rpc
Expand Down Expand Up @@ -379,6 +431,7 @@ pub async fn fetch_block(
pub async fn fetch_chunk(
data: &Data<ServerContext>,
chunk_reference: near_jsonrpc::primitives::types::chunks::ChunkReference,
method_name: &str,
) -> Result<
near_jsonrpc::primitives::types::chunks::RpcChunkResponse,
near_jsonrpc::primitives::types::chunks::RpcChunkError,
Expand All @@ -400,7 +453,7 @@ pub async fn fetch_chunk(
}
near_primitives::types::BlockId::Hash(block_hash) => data
.db_manager
.get_block_height_by_hash(block_hash, "chunk")
.get_block_height_by_hash(block_hash, method_name)
.await
.map_err(|err| {
tracing::error!("Failed to fetch block by hash: {}", err);
Expand All @@ -412,7 +465,7 @@ pub async fn fetch_chunk(
// Check if the chunk stored in block with the given height
if let Ok(block_height_shard_id) = data
.db_manager
.get_block_by_height_and_shard_id(block_height, shard_id, "chunk")
.get_block_by_height_and_shard_id(block_height, shard_id, method_name)
.await
{
(block_height_shard_id.0, block_height_shard_id.1)
Expand All @@ -422,7 +475,7 @@ pub async fn fetch_chunk(
}
near_jsonrpc::primitives::types::chunks::ChunkReference::ChunkHash { chunk_id } => data
.db_manager
.get_block_by_chunk_hash(chunk_id, "chunk")
.get_block_by_chunk_hash(chunk_id, method_name)
.await
.map_err(
|_err| near_jsonrpc::primitives::types::chunks::RpcChunkError::UnknownChunk {
Expand All @@ -444,7 +497,7 @@ pub async fn fetch_chunk(
&near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
block_height,
)),
"chunk",
method_name,
Some(block_height),
)
.await;
Expand Down
9 changes: 0 additions & 9 deletions rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,6 @@ impl BlocksInfoByFinality {
// Update final block info in the cache.
// Executes every second.
pub async fn update_final_block(&self, block_info: BlockInfo) {
tracing::debug!(
"Update final block info: {:?}",
block_info.block_cache.block_height
);
let mut final_block_lock = self.final_block.write().await;
final_block_lock.block_cache = block_info.block_cache;
final_block_lock.block_view = block_info.block_view;
Expand All @@ -348,11 +344,6 @@ impl BlocksInfoByFinality {
// Update optimistic block changes and optimistic block info in the cache.
// Executes every second.
pub async fn update_optimistic_block(&self, block_info: BlockInfo) {
tracing::debug!(
"Update optimistic block info: {:?}",
block_info.block_cache.block_height
);

let mut optimistic_changes_lock = self.optimistic_changes.write().await;
optimistic_changes_lock.account_changes = block_info.changes_in_block_account_map().await;

Expand Down
25 changes: 15 additions & 10 deletions rpc-server/src/modules/network/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ pub async fn protocol_config(
request_data
);

let config_view = protocol_config_call(&data, request_data.block_reference.clone()).await;
let config_view = protocol_config_call(
&data,
request_data.block_reference.clone(),
"EXPERIMENTAL_protocol_config",
)
.await;

#[cfg(feature = "shadow-data-consistency")]
{
Expand Down Expand Up @@ -366,21 +371,21 @@ async fn validators_call(
Ok(validators.validators_info)
}

async fn protocol_config_call(
pub async fn protocol_config_call(
data: &Data<ServerContext>,
block_reference: near_primitives::types::BlockReference,
method_name: &str,
) -> Result<
near_chain_configs::ProtocolConfigView,
near_jsonrpc::primitives::types::config::RpcProtocolConfigError,
> {
let protocol_version =
get_protocol_version(data, block_reference, "EXPERIMENTAL_protocol_config")
.await
.map_err(|err| {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock {
error_message: err.to_string(),
}
})?;
let protocol_version = get_protocol_version(data, block_reference, method_name)
.await
.map_err(|err| {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock {
error_message: err.to_string(),
}
})?;
// Stores runtime config for each protocol version
// Create store of runtime configs for the given chain id.
//
Expand Down
Loading
Loading