diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ebd1936..4fb67f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/read-rpc/compare/main...develop) +### What's Changed +* Migrate from lake data to fastnear data +* Add metrics to calculate the number of blocks which fetched from the cache and fastnear + ## [0.3.3](https://github.com/near/read-rpc/releases/tag/v0.3.3) ### Supported Nearcore Version diff --git a/Cargo.lock b/Cargo.lock index 3bdc2130..64e5eab9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1574,9 +1574,6 @@ name = "configuration" version = "0.3.3" dependencies = [ "anyhow", - "aws-credential-types", - "aws-sdk-s3", - "aws-types", "dotenv", "google-cloud-storage", "lazy_static", diff --git a/configuration/Cargo.toml b/configuration/Cargo.toml index 6803c0dc..70da9cb4 100644 --- a/configuration/Cargo.toml +++ b/configuration/Cargo.toml @@ -9,9 +9,6 @@ license.workspace = true [dependencies] anyhow = "1.0.70" -aws-credential-types = "1.1.4" -aws-sdk-s3 = { version = "1.14.0", features = ["behavior-version-latest"] } -aws-types = "1.1.4" dotenv = "0.15.0" google-cloud-storage = "0.23.0" lazy_static = "1.4.0" diff --git a/configuration/src/configs/lake.rs b/configuration/src/configs/lake.rs index f8d2335b..a4a74cb0 100644 --- a/configuration/src/configs/lake.rs +++ b/configuration/src/configs/lake.rs @@ -1,86 +1,55 @@ -use aws_sdk_s3::config::StalledStreamProtectionConfig; +use crate::configs::deserialize_optional_data_or_env; use near_lake_framework::near_indexer_primitives::near_primitives; use serde_derive::Deserialize; -use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic}; - #[derive(Debug, Clone)] pub struct LakeConfig { - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub aws_default_region: String, - pub aws_bucket_name: String, + pub num_threads: Option, } impl LakeConfig { - pub async fn s3_config(&self) -> aws_sdk_s3::Config { - let credentials = aws_credential_types::Credentials::new( - &self.aws_access_key_id, - &self.aws_secret_access_key, - None, - None, - "", - ); - aws_sdk_s3::Config::builder() - .stalled_stream_protection(StalledStreamProtectionConfig::disabled()) - .credentials_provider(credentials) - .region(aws_types::region::Region::new( - self.aws_default_region.clone(), - )) - .build() - } - pub async fn lake_config( &self, start_block_height: near_primitives::types::BlockHeight, - ) -> anyhow::Result { - let config_builder = near_lake_framework::LakeConfigBuilder::default(); + chain_id: crate::ChainId, + ) -> anyhow::Result { + let mut config_builder = near_lake_framework::FastNearConfigBuilder::default(); + match chain_id { + crate::ChainId::Mainnet => config_builder = config_builder.mainnet(), + // Testnet is the default chain for other chain_id + _ => config_builder = config_builder.testnet(), + }; + if let Some(num_threads) = self.num_threads { + config_builder = config_builder.num_threads(num_threads); + }; Ok(config_builder - .s3_config(self.s3_config().await) - .s3_region_name(&self.aws_default_region) - .s3_bucket_name(&self.aws_bucket_name) .start_block_height(start_block_height) - .build() - .expect("Failed to build LakeConfig")) + .build()?) } - pub async fn lake_s3_client(&self) -> near_lake_framework::LakeS3Client { - let s3_config = self.s3_config().await; - near_lake_framework::LakeS3Client::new(aws_sdk_s3::Client::from_conf(s3_config)) + pub async fn lake_client( + &self, + chain_id: crate::ChainId, + ) -> anyhow::Result { + let fast_near_endpoint = match chain_id { + crate::ChainId::Mainnet => String::from("https://mainnet.neardata.xyz"), + // Testnet is the default chain for other chain_id + _ => String::from("https://testnet.neardata.xyz"), + }; + Ok(near_lake_framework::FastNearClient::new(fast_near_endpoint)) } } #[derive(Deserialize, Debug, Clone, Default)] pub struct CommonLakeConfig { #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_access_key_id: Option, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_secret_access_key: Option, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_default_region: Option, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_bucket_name: Option, + pub num_threads: Option, } impl From for LakeConfig { fn from(common_config: CommonLakeConfig) -> Self { Self { - aws_access_key_id: required_value_or_panic( - "aws_access_key_id", - common_config.aws_access_key_id, - ), - aws_secret_access_key: required_value_or_panic( - "aws_secret_access_key", - common_config.aws_secret_access_key, - ), - aws_default_region: required_value_or_panic( - "aws_default_region", - common_config.aws_default_region, - ), - aws_bucket_name: required_value_or_panic( - "aws_bucket_name", - common_config.aws_bucket_name, - ), + num_threads: common_config.num_threads, } } } diff --git a/configuration/src/default_env_configs.rs b/configuration/src/default_env_configs.rs index 469a96de..0268f72a 100644 --- a/configuration/src/default_env_configs.rs +++ b/configuration/src/default_env_configs.rs @@ -120,18 +120,9 @@ tracked_changes = "${TRACKED_CHANGES}" ### Lake framework configuration [lake_config] - -## Lake framework AWS access key id -aws_access_key_id = "${AWS_ACCESS_KEY_ID}" - -## Lake framework AWS secret access key -aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}" - -## Lake framework AWS default region -aws_default_region = "${AWS_DEFAULT_REGION}" - -## Lake framework bucket name -aws_bucket_name = "${AWS_BUCKET_NAME}" +# Number of threads to use for fetching data from fatnear +# Default: 2 * available threads +#num_threads = 8 ## Transaction details are stored in the Google Cloud Storage [tx_details_storage] diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 85cefaef..aaa7c2f8 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -4,6 +4,7 @@ use futures::executor::block_on; use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig}; use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock}; +use crate::utils; static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION"); static NEARD_BUILD: &str = env!("BUILD_VERSION"); @@ -20,8 +21,7 @@ pub struct GenesisInfo { impl GenesisInfo { pub async fn get( near_rpc_client: &crate::utils::JsonRpcClient, - s3_client: &near_lake_framework::LakeS3Client, - s3_bucket_name: &str, + fastnear_client: &near_lake_framework::FastNearClient, ) -> Self { tracing::info!("Get genesis config..."); let genesis_config = near_rpc_client @@ -32,25 +32,20 @@ impl GenesisInfo { .await .expect("Error to get genesis config"); - let genesis_block = near_lake_framework::s3::fetchers::fetch_block( - s3_client, - s3_bucket_name, - genesis_config.genesis_height, - ) - .await - .expect("Error to get genesis block"); + let genesis_block = + near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client).await; Self { genesis_config, - genesis_block_cache: CacheBlock::from(&genesis_block), + genesis_block_cache: CacheBlock::from(&genesis_block.block), } } } #[derive(Clone)] pub struct ServerContext { - /// Lake s3 client - pub s3_client: near_lake_framework::LakeS3Client, + /// Fastnear client + pub fastnear_client: near_lake_framework::FastNearClient, /// Database manager pub db_manager: std::sync::Arc>, /// TransactionDetails storage @@ -61,8 +56,6 @@ pub struct ServerContext { pub genesis_info: GenesisInfo, /// Near rpc client pub near_rpc_client: crate::utils::JsonRpcClient, - /// AWS s3 lake bucket name - pub s3_bucket_name: String, /// Blocks cache pub blocks_cache: std::sync::Arc>, /// Final block info include final_block_cache and current_validators_info @@ -89,27 +82,36 @@ pub struct ServerContext { } impl ServerContext { - pub async fn init( - rpc_server_config: configuration::RpcServerConfig, - near_rpc_client: crate::utils::JsonRpcClient, - ) -> anyhow::Result { + pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result { let contract_code_cache_size_in_bytes = - crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size) - .await; + utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await; let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new( contract_code_cache_size_in_bytes, )); let block_cache_size_in_bytes = - crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await; + utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await; let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new( block_cache_size_in_bytes, )); - - let blocks_info_by_finality = - std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await); - - let s3_client = rpc_server_config.lake_config.lake_s3_client().await; + let near_rpc_client = utils::JsonRpcClient::new( + rpc_server_config.general.near_rpc_url.clone(), + rpc_server_config.general.near_archival_rpc_url.clone(), + ); + // We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance + let near_rpc_client = near_rpc_client.header( + "Referer".to_string(), + rpc_server_config.general.referer_header_value.clone(), + )?; + + let fastnear_client = rpc_server_config + .lake_config + .lake_client(rpc_server_config.general.chain_id) + .await?; + + let blocks_info_by_finality = std::sync::Arc::new( + BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await, + ); let tx_details_storage = tx_details_storage::TxDetailsStorage::new( rpc_server_config.tx_details_storage.storage_client().await, @@ -124,12 +126,7 @@ impl ServerContext { }) .ok(); - let genesis_info = GenesisInfo::get( - &near_rpc_client, - &s3_client, - &rpc_server_config.lake_config.aws_bucket_name, - ) - .await; + let genesis_info = GenesisInfo::get(&near_rpc_client, &fastnear_client).await; let default_epoch_config = EpochConfig::from(&genesis_info.genesis_config); let all_epoch_config = AllEpochConfig::new( @@ -159,13 +156,12 @@ impl ServerContext { .inc(); Ok(Self { - s3_client, + fastnear_client, db_manager: std::sync::Arc::new(Box::new(db_manager)), tx_details_storage: std::sync::Arc::new(tx_details_storage), tx_cache_storage, genesis_info, near_rpc_client, - s3_bucket_name: rpc_server_config.lake_config.aws_bucket_name.clone(), blocks_cache, blocks_info_by_finality, compiled_contract_code_cache, diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 11d869d8..5928978b 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -328,79 +328,16 @@ async fn main() -> anyhow::Result<()> { let rpc_server_config = configuration::read_configuration::().await?; - let near_rpc_client = utils::JsonRpcClient::new( - rpc_server_config.general.near_rpc_url.clone(), - rpc_server_config.general.near_archival_rpc_url.clone(), - ); - // We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance - let near_rpc_client = near_rpc_client.header( - "Referer".to_string(), - rpc_server_config.general.referer_header_value.clone(), - )?; - - let server_port = rpc_server_config.general.server_port; - - let server_context = actix_web::web::Data::new( - config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?, - ); - - let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache); - let blocks_info_by_finality_clone = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); - let near_rpc_client_clone = near_rpc_client.clone(); - - let finality_blocks_storage = - cache_storage::BlocksByFinalityCache::new(rpc_server_config.general.redis_url.to_string()) - .await - .map_err(|err| { - crate::metrics::OPTIMISTIC_UPDATING.set_not_working(); - tracing::warn!("Failed to connect to Redis: {:?}", err); - }) - .ok(); + let server_context = + actix_web::web::Data::new(config::ServerContext::init(rpc_server_config.clone()).await?); - // We need to update final block from Redis and Lake - // Because we can't be sure that Redis has the latest block - // And Lake can be used as a backup source - - // Update final block from Redis if Redis is available - if let Some(finality_blocks_storage) = finality_blocks_storage.clone() { - tokio::spawn(async move { - utils::update_final_block_regularly_from_redis( - blocks_cache_clone, - blocks_info_by_finality_clone, - finality_blocks_storage, - near_rpc_client_clone, - ) - .await - }); - } - - // Update final block from Lake - let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache); - let blocks_info_by_finality_clone = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); - tokio::spawn(async move { - utils::update_final_block_regularly_from_lake( - blocks_cache_clone, - blocks_info_by_finality_clone, - rpc_server_config, - near_rpc_client, - ) - .await - }); - - // Update optimistic block from Redis if Redis is available - if let Some(finality_blocks_storage) = finality_blocks_storage { - let blocks_info_by_finality = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); - tokio::spawn(async move { - utils::update_optimistic_block_regularly( - blocks_info_by_finality, - finality_blocks_storage, - ) - .await - }); - } + utils::task_regularly_update_blocks_by_finality( + std::sync::Arc::clone(&server_context.blocks_info_by_finality), + std::sync::Arc::clone(&server_context.blocks_cache), + server_context.fastnear_client.clone(), + server_context.near_rpc_client.clone(), + ) + .await; actix_web::HttpServer::new(move || { let cors = actix_cors::Cors::permissive(); @@ -413,7 +350,10 @@ async fn main() -> anyhow::Result<()> { .service(metrics::get_metrics) .service(health::get_health_status) }) - .bind(format!("0.0.0.0:{:0>5}", server_port))? + .bind(format!( + "0.0.0.0:{:0>5}", + rpc_server_config.general.server_port + ))? .run() .await?; diff --git a/rpc-server/src/metrics.rs b/rpc-server/src/metrics.rs index 5af0d36e..e649489b 100644 --- a/rpc-server/src/metrics.rs +++ b/rpc-server/src/metrics.rs @@ -120,6 +120,11 @@ lazy_static! { prometheus::register(Box::new(counter_vec.clone())).unwrap(); counter_vec }; + pub(crate) static ref REQUESTS_BLOCKS_COUNTERS: IntCounterVec = register_int_counter_vec( + "requests_blocks_counters", + "Total number of requests blocks from Lake and Cache", + &["method_name", "source"] // This declares a label named `method_name` and `source`(lake or cache) + ).unwrap(); // Error metrics // 0: ReadRPC success, NEAR RPC success" diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 5a7a5dfb..76ed0ad7 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -4,7 +4,8 @@ use near_primitives::views::StateChangeValueView; use crate::config::ServerContext; use crate::modules::blocks::utils::{ - check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_s3, is_matching_change, + check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_fastnear, + is_matching_change, }; /// `block` rpc method implementation @@ -260,7 +261,13 @@ async fn changes_in_block_call( } })?; - let result = fetch_changes_in_block(&data, cache_block, ¶ms.block_reference).await; + let result = fetch_changes_in_block( + &data, + cache_block, + ¶ms.block_reference, + "EXPERIMENTAL_changes_in_block", + ) + .await; #[cfg(feature = "shadow-data-consistency")] { if let near_primitives::types::BlockReference::Finality(_) = params.block_reference { @@ -305,6 +312,7 @@ async fn changes_in_block_by_type_call( cache_block, ¶ms.state_changes_request, ¶ms.block_reference, + "EXPERIMENTAL_changes", ) .await; @@ -366,6 +374,9 @@ pub async fn fetch_block( return match finality { near_primitives::types::Finality::Final | near_primitives::types::Finality::DoomSlug => { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); let block_view = data.blocks_info_by_finality.final_block_view().await; Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view }) } @@ -378,6 +389,9 @@ pub async fn fetch_block( }, ) } else { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); let block_view = data.blocks_info_by_finality.optimistic_block_view().await; Ok( near_jsonrpc::primitives::types::blocks::RpcBlockResponse { @@ -401,6 +415,9 @@ pub async fn fetch_block( .await .block_height { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); data.blocks_info_by_finality.final_block_view().await } else if block_height == data @@ -409,16 +426,21 @@ pub async fn fetch_block( .await .block_height { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); data.blocks_info_by_finality.optimistic_block_view().await } else { - near_lake_framework::s3::fetchers::fetch_block( - &data.s3_client, - &data.s3_bucket_name, + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); + near_lake_framework::fastnear::fetchers::fetch_block_or_retry( + &data.fastnear_client, block_height, ) .await .map_err(|err| { - tracing::error!("Failed to fetch block from S3: {}", err); + tracing::error!("Failed to fetch block from fastnear: {}", err); near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { error_message: format!("BLOCK HEIGHT: {:?}", block_height), } @@ -484,13 +506,11 @@ pub async fn fetch_chunk( ) .map(|block_height_shard_id| (block_height_shard_id.0, block_height_shard_id.1))?, }; - let chunk_view = fetch_chunk_from_s3( - &data.s3_client, - &data.s3_bucket_name, - block_height, - shard_id.into(), - ) - .await?; + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); + let chunk_view = + fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?; // increase block category metrics crate::metrics::increase_request_category_metrics( data, @@ -510,11 +530,12 @@ async fn fetch_changes_in_block( data: &Data, cache_block: crate::modules::blocks::CacheBlock, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> Result< near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockByTypeResponse, near_jsonrpc::primitives::types::changes::RpcStateChangesError, > { - let trie_keys = fetch_state_changes(data, cache_block, block_reference) + let trie_keys = fetch_state_changes(data, cache_block, block_reference, method_name) .await .map_err(|err| { near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock { @@ -598,11 +619,12 @@ async fn fetch_changes_in_block_by_type( cache_block: crate::modules::blocks::CacheBlock, state_changes_request: &near_primitives::views::StateChangesRequestView, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> Result< near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockResponse, near_jsonrpc::primitives::types::changes::RpcStateChangesError, > { - let changes = fetch_state_changes(data, cache_block, block_reference) + let changes = fetch_state_changes(data, cache_block, block_reference, method_name) .await .map_err(|err| { near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock { @@ -629,6 +651,7 @@ async fn fetch_state_changes( data: &Data, cache_block: crate::modules::blocks::CacheBlock, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> anyhow::Result { if let near_primitives::types::BlockReference::Finality(finality) = block_reference { match finality { @@ -650,7 +673,7 @@ async fn fetch_state_changes( } } } else { - Ok(fetch_shards_by_cache_block(data, cache_block) + Ok(fetch_shards_by_cache_block(data, cache_block, method_name) .await? .into_iter() .flat_map(|shard| shard.state_changes) @@ -663,28 +686,21 @@ async fn fetch_state_changes( async fn fetch_shards_by_cache_block( data: &Data, cache_block: crate::modules::blocks::CacheBlock, + method_name: &str, ) -> anyhow::Result> { - let fetch_shards_futures = (0..cache_block.chunks_included) - .collect::>() - .into_iter() - .map(|shard_id| { - near_lake_framework::s3::fetchers::fetch_shard( - &data.s3_client, - &data.s3_bucket_name, - cache_block.block_height, - shard_id, - ) - }); - - futures::future::join_all(fetch_shards_futures) - .await - .into_iter() - .collect::>() - .map_err(|err| { - anyhow::anyhow!( - "Failed to fetch shards for block {} with error: {}", - cache_block.block_height, - err - ) - }) + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); + match near_lake_framework::fastnear::fetchers::fetch_streamer_message( + &data.fastnear_client, + cache_block.block_height, + ) + .await + { + Some(streamer_message) => Ok(streamer_message.shards), + None => Err(anyhow::anyhow!( + "Failed to fetch shards for block {}", + cache_block.block_height, + )), + } } diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index e4fac836..7ba35bde 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -10,7 +10,6 @@ pub struct CacheBlock { pub block_timestamp: u64, pub gas_price: near_primitives::types::Balance, pub latest_protocol_version: near_primitives::types::ProtocolVersion, - pub chunks_included: u64, pub state_root: near_primitives::hash::CryptoHash, pub epoch_id: near_primitives::hash::CryptoHash, } @@ -61,7 +60,6 @@ impl From<&near_primitives::views::BlockView> for CacheBlock { block_timestamp: block.header.timestamp, gas_price: block.header.gas_price, latest_protocol_version: block.header.latest_protocol_version, - chunks_included: block.header.chunks_included, state_root: block.header.prev_state_root, epoch_id: block.header.epoch_id, } @@ -76,15 +74,6 @@ pub struct BlockInfo { } impl BlockInfo { - // Create new BlockInfo from BlockView. this method is useful only for start rpc-server. - pub async fn new_from_block_view(block_view: near_primitives::views::BlockView) -> Self { - Self { - block_cache: CacheBlock::from(&block_view), - block_view, - changes: vec![], // We left changes empty because block_view doesn't contain state changes. - } - } - // Create new BlockInfo from StreamerMessage. // This is using to update final and optimistic blocks regularly. pub async fn new_from_streamer_message( @@ -296,38 +285,30 @@ pub struct BlocksInfoByFinality { impl BlocksInfoByFinality { pub async fn new( near_rpc_client: &crate::utils::JsonRpcClient, - blocks_cache: &std::sync::Arc>, + fast_near_client: &near_lake_framework::FastNearClient, ) -> Self { - let final_block_future = crate::utils::get_final_block(near_rpc_client, false); - let optimistic_block_future = crate::utils::get_final_block(near_rpc_client, true); - let validators_future = crate::utils::get_current_validators(near_rpc_client); - - let (final_block, optimistic_block, validators) = futures::try_join!( - final_block_future, - optimistic_block_future, - validators_future, - ) - .map_err(|err| { - tracing::error!("Error to fetch final block info: {:?}", err); - err - }) - .expect("Error to get final block info"); - - blocks_cache - .put(final_block.header.height, CacheBlock::from(&final_block)) - .await; + let final_block = + near_lake_framework::fastnear::fetchers::fetch_last_block(fast_near_client).await; + let optimistic_block = + near_lake_framework::fastnear::fetchers::fetch_optimistic_block(fast_near_client).await; + let validators = crate::utils::get_current_validators(near_rpc_client) + .await + .expect("Failed to get current validators"); + let protocol_version = crate::utils::get_current_protocol_version(near_rpc_client) + .await + .expect("Failed to get current protocol version"); Self { final_block: futures_locks::RwLock::new( - BlockInfo::new_from_block_view(final_block).await, + BlockInfo::new_from_streamer_message(final_block).await, ), optimistic_block: futures_locks::RwLock::new( - BlockInfo::new_from_block_view(optimistic_block).await, + BlockInfo::new_from_streamer_message(optimistic_block).await, ), optimistic_changes: futures_locks::RwLock::new(OptimisticChanges::new()), current_validators: futures_locks::RwLock::new(CurrentValidatorInfo { validators }), current_protocol_version: futures_locks::RwLock::new(CurrentProtocolVersion { - protocol_version: near_primitives::version::PROTOCOL_VERSION, + protocol_version, }), } } @@ -359,18 +340,13 @@ impl BlocksInfoByFinality { &self, near_rpc_client: &crate::utils::JsonRpcClient, ) -> anyhow::Result<()> { - self.current_validators.write().await.validators = - crate::utils::get_current_validators(near_rpc_client).await?; - Ok(()) - } - - // Update current protocol version in the cache. - // This method executes when the protocol version changes. - pub async fn update_current_protocol_version( - &self, - protocol_version: near_primitives::types::ProtocolVersion, - ) -> anyhow::Result<()> { - self.current_protocol_version.write().await.protocol_version = protocol_version; + let current_validators_future = crate::utils::get_current_validators(near_rpc_client); + let current_protocol_version_future = + crate::utils::get_current_protocol_version(near_rpc_client); + let (current_validators, current_protocol_version) = + futures::try_join!(current_validators_future, current_protocol_version_future,)?; + self.current_validators.write().await.validators = current_validators; + self.current_protocol_version.write().await.protocol_version = current_protocol_version; Ok(()) } diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 254acc7d..0115c901 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -43,11 +43,10 @@ pub async fn check_block_height( #[cfg_attr( feature = "tracing-instrumentation", - tracing::instrument(skip(s3_client)) + tracing::instrument(skip(fastnear_client)) )] -pub async fn fetch_chunk_from_s3( - s3_client: &near_lake_framework::LakeS3Client, - s3_bucket_name: &str, +pub async fn fetch_chunk_from_fastnear( + fastnear_client: &near_lake_framework::FastNearClient, block_height: near_primitives::types::BlockHeight, shard_id: near_primitives::types::ShardId, ) -> Result @@ -57,9 +56,8 @@ pub async fn fetch_chunk_from_s3( block_height, shard_id ); - match near_lake_framework::s3::fetchers::fetch_shard( - s3_client, - s3_bucket_name, + match near_lake_framework::fastnear::fetchers::fetch_shard_or_retry( + fastnear_client, block_height, shard_id.into(), ) @@ -164,7 +162,12 @@ pub async fn fetch_block_from_cache_or_get( } }; let cache_block = match block { - Some(block) => block, + Some(block) => { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); + block + } None => { let block_from_s3 = fetch_block(data, block_reference, method_name).await?; let block = CacheBlock::from(&block_from_s3.block_view); diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index c6df311b..b2cc32ab 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,7 +1,6 @@ use crate::modules::blocks::{BlockInfo, BlocksInfoByFinality, CacheBlock}; #[cfg(feature = "shadow-data-consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; -use futures::StreamExt; #[cfg(feature = "shadow-data-consistency")] const DEFAULT_RETRY_COUNT: u8 = 3; @@ -106,37 +105,6 @@ impl JsonRpcClient { } } -pub async fn get_final_block( - near_rpc_client: &JsonRpcClient, - optimistic: bool, -) -> anyhow::Result { - let block_request_method = near_jsonrpc_client::methods::block::RpcBlockRequest { - block_reference: near_primitives::types::BlockReference::Finality(if optimistic { - near_primitives::types::Finality::None - } else { - near_primitives::types::Finality::Final - }), - }; - let block_view = near_rpc_client.call(block_request_method, None).await?; - - // Updating the metric to expose the block height considered as final by the server - // this metric can be used to calculate the lag between the server and the network - // Prometheus Gauge Metric type do not support u64 - // https://github.com/tikv/rust-prometheus/issues/470 - if optimistic { - // optimistic block height - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .set(i64::try_from(block_view.header.height)?); - } else { - // final block height - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["final"]) - .set(i64::try_from(block_view.header.height)?); - } - Ok(block_view) -} - pub async fn get_current_validators( near_rpc_client: &JsonRpcClient, ) -> anyhow::Result { @@ -146,20 +114,28 @@ pub async fn get_current_validators( Ok(near_rpc_client.call(params, None).await?) } +pub async fn get_current_protocol_version( + near_rpc_client: &JsonRpcClient, +) -> anyhow::Result { + let params = near_jsonrpc_client::methods::status::RpcStatusRequest; + let protocol_version = near_rpc_client.call(params, None).await?.protocol_version; + Ok(protocol_version) +} + async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, blocks_cache: std::sync::Arc>, blocks_info_by_finality: std::sync::Arc, near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { - let block = BlockInfo::new_from_streamer_message(streamer_message).await; - let block_cache = block.block_cache; - - if block_cache.block_height as i64 + if streamer_message.block.header.height as i64 > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY .with_label_values(&["final"]) .get() { + let block = BlockInfo::new_from_streamer_message(streamer_message).await; + let block_cache = block.block_cache; + if blocks_info_by_finality.final_cache_block().await.epoch_id != block_cache.epoch_id { tracing::info!("New epoch started: {:?}", block_cache.epoch_id); blocks_info_by_finality @@ -178,143 +154,87 @@ async fn handle_streamer_message( Ok(()) } -pub async fn update_final_block_regularly_from_lake( - blocks_cache: std::sync::Arc>, - blocks_info_by_finality: std::sync::Arc, - rpc_server_config: configuration::RpcServerConfig, - near_rpc_client: JsonRpcClient, -) -> anyhow::Result<()> { - tracing::info!("Task to get final block from lake and store in the cache started"); - let lake_config = rpc_server_config - .lake_config - .lake_config( - blocks_info_by_finality - .optimistic_cache_block() - .await - .block_height, - ) - .await?; - let (sender, stream) = near_lake_framework::streamer(lake_config); - let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) - .map(|streamer_message| { - handle_streamer_message( - streamer_message, - std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&blocks_info_by_finality), - &near_rpc_client, - ) - }) - .buffer_unordered(1usize); - - while let Some(_handle_message) = handlers.next().await { - if let Err(err) = _handle_message { - tracing::warn!("{:?}", err); - } - } - drop(handlers); // close the channel so the sender will stop - - // propagate errors from the sender - match sender.await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => Err(e), - Err(e) => Err(anyhow::Error::from(e)), // JoinError - } -} - // Task to get and store final block in the cache -// Subscribe to the redis channel and update the final block in the cache -pub async fn update_final_block_regularly_from_redis( +async fn task_update_final_block_regularly( blocks_cache: std::sync::Arc>, blocks_info_by_finality: std::sync::Arc, - finality_blocks_storage: cache_storage::BlocksByFinalityCache, + fastnear_client: near_lake_framework::FastNearClient, near_rpc_client: JsonRpcClient, ) { tracing::info!("Task to get and store final block in the cache started"); - let mut current_protocol_version = blocks_info_by_finality.current_protocol_version().await; + let mut final_block_height = blocks_info_by_finality + .final_cache_block() + .await + .block_height; loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - // Update protocol version from redis regularly - match finality_blocks_storage.get_protocol_version().await { - Ok(protocol_version) => { - if protocol_version != current_protocol_version { - if let Err(err) = blocks_info_by_finality - .update_current_protocol_version(protocol_version) - .await - { - tracing::error!("Failed to update protocol version from Redis: {:?}", err); - } else { - // If the protocol version is updated from the Redis, update the local value - // otherwise, we will keep trying to update from the Redis - current_protocol_version = protocol_version; - }; - }; - } - Err(err) => { - tracing::error!("Failed to get protocol version from Redis: {:?}", err); - } - } - - // Update final block from Redis regularly - match finality_blocks_storage - .get_block_by_finality(near_primitives::types::Finality::Final) + final_block_height += 1; + if let Some(streamer_message) = + near_lake_framework::fastnear::fetchers::fetch_streamer_message( + &fastnear_client, + final_block_height, + ) .await { - Ok(streamer_message) => { - if let Err(err) = handle_streamer_message( - streamer_message, - std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&blocks_info_by_finality), - &near_rpc_client, - ) - .await - { - tracing::error!("Error to handle_streamer_message: {:?}", err); - } - } - Err(err) => { - tracing::warn!("Error to get final block from redis: {:?}", err); - } + if let Err(err) = handle_streamer_message( + streamer_message, + std::sync::Arc::clone(&blocks_cache), + std::sync::Arc::clone(&blocks_info_by_finality), + &near_rpc_client, + ) + .await + { + tracing::error!("Error in fn handle_streamer_message(): {:?}", err); + }; } } } // Task to get and store optimistic block in the cache -// Subscribe to the redis channel and update the optimistic block in the cache -pub async fn update_optimistic_block_regularly( +async fn task_update_optimistic_block_regularly( blocks_info_by_finality: std::sync::Arc, - finality_blocks_storage: cache_storage::BlocksByFinalityCache, + fastnear_client: near_lake_framework::FastNearClient, ) { tracing::info!("Task to get and store optimistic block in the cache started"); + let mut optimistic_block_height = blocks_info_by_finality + .optimistic_cache_block() + .await + .block_height; loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - match finality_blocks_storage - .get_block_by_finality(near_primitives::types::Finality::None) + optimistic_block_height += 1; + if let Some(streamer_message) = + near_lake_framework::fastnear::fetchers::fetch_optimistic_block_by_height( + &fastnear_client, + optimistic_block_height, + ) .await { - Ok(streamer_message) => { + if streamer_message.block.header.height as i64 + > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .get() + { let optimistic_block = BlockInfo::new_from_streamer_message(streamer_message).await; let optimistic_block_cache = optimistic_block.block_cache; - if optimistic_block_cache.block_height as i64 - > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .get() - { - blocks_info_by_finality - .update_optimistic_block(optimistic_block) - .await; - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .set( - i64::try_from(optimistic_block_cache.block_height) - .expect("Invalid optimistic block height"), - ); - } - } - Err(err) => { - tracing::warn!("Error to get optimistic block from redis: {:?}", err); + blocks_info_by_finality + .update_optimistic_block(optimistic_block) + .await; + crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .set( + i64::try_from(optimistic_block_cache.block_height) + .expect("Invalid optimistic block height"), + ); } } + } +} + +// Task to check optimistic block status +async fn task_optimistic_block_status() { + tracing::info!("Task to check optimistic block status started"); + loop { + // check every second + tokio::time::sleep(std::time::Duration::from_secs(1)).await; // When an optimistic block is not updated, or it is lower than the final block // we need to mark that optimistic updating is not working @@ -344,10 +264,39 @@ pub async fn update_optimistic_block_regularly( { crate::metrics::OPTIMISTIC_UPDATING.set_working(); tracing::info!("Optimistic block updating is resumed."); - }; + } } } +pub async fn task_regularly_update_blocks_by_finality( + blocks_info_by_finality: std::sync::Arc, + blocks_cache: std::sync::Arc>, + fastnear_client: near_lake_framework::FastNearClient, + near_rpc_client: JsonRpcClient, +) { + // Task update final block regularly + let blocks_info_by_finality_clone = std::sync::Arc::clone(&blocks_info_by_finality); + let fastnear_client_clone = fastnear_client.clone(); + + tokio::spawn(async move { + task_update_final_block_regularly( + blocks_cache, + blocks_info_by_finality_clone, + fastnear_client_clone, + near_rpc_client, + ) + .await + }); + + // Task update optimistic block regularly + tokio::spawn(async move { + task_update_optimistic_block_regularly(blocks_info_by_finality, fastnear_client).await + }); + + // Task to check the optimistic block status + tokio::spawn(async move { task_optimistic_block_status().await }); +} + /// Convert gigabytes to bytes pub(crate) async fn gigabytes_to_bytes(gigabytes: f64) -> usize { (gigabytes * 1024.0 * 1024.0 * 1024.0) as usize diff --git a/state-indexer/src/main.rs b/state-indexer/src/main.rs index 170ad1e1..879661df 100644 --- a/state-indexer/src/main.rs +++ b/state-indexer/src/main.rs @@ -36,7 +36,10 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let lake_config = indexer_config.lake_config.lake_config(start_block_height).await?; + let lake_config = indexer_config + .lake_config + .lake_config(start_block_height, indexer_config.general.chain_id.clone()) + .await?; let (sender, stream) = near_lake_framework::streamer(lake_config); // Initiate metrics http server diff --git a/tx-indexer/src/main.rs b/tx-indexer/src/main.rs index 4af238b5..98c471f8 100644 --- a/tx-indexer/src/main.rs +++ b/tx-indexer/src/main.rs @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!(target: INDEXER, "Generating LakeConfig..."); let lake_config = indexer_config .lake_config - .lake_config(start_block_height) + .lake_config(start_block_height, indexer_config.general.chain_id.clone()) .await?; tracing::info!(target: INDEXER, "Creating cache storage...");