From c9ec0b44d891f28d903bc059742665a062f666e4 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Wed, 2 Oct 2024 17:26:51 +0300 Subject: [PATCH 1/9] migrate to fastnear --- Cargo.lock | 4 - cache-storage/src/lib.rs | 87 ---------- configuration/Cargo.toml | 3 - configuration/src/configs/lake.rs | 63 +------- configuration/src/default_env_configs.rs | 6 + near-state-indexer/Cargo.toml | 1 - near-state-indexer/src/main.rs | 24 +-- near-state-indexer/src/utils.rs | 110 ------------- rpc-server/src/config.rs | 44 +++-- rpc-server/src/main.rs | 56 ++----- rpc-server/src/modules/blocks/methods.rs | 65 +++----- rpc-server/src/modules/blocks/mod.rs | 27 ++-- rpc-server/src/modules/blocks/utils.rs | 12 +- rpc-server/src/utils.rs | 195 +++++++++-------------- 14 files changed, 158 insertions(+), 539 deletions(-) delete mode 100644 near-state-indexer/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 3bdc2130..99b4176e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1574,9 +1574,6 @@ name = "configuration" version = "0.3.3" dependencies = [ "anyhow", - "aws-credential-types", - "aws-sdk-s3", - "aws-types", "dotenv", "google-cloud-storage", "lazy_static", @@ -5108,7 +5105,6 @@ dependencies = [ "actix", "actix-web", "anyhow", - "cache-storage", "clap", "configuration", "database", diff --git a/cache-storage/src/lib.rs b/cache-storage/src/lib.rs index c9e18964..a41fb0bd 100644 --- a/cache-storage/src/lib.rs +++ b/cache-storage/src/lib.rs @@ -1,5 +1,4 @@ use futures::FutureExt; -use near_indexer_primitives::near_primitives; mod utils; @@ -115,92 +114,6 @@ impl RedisCacheStorage { } } -#[derive(Clone)] -pub struct BlocksByFinalityCache { - cache_storage: RedisCacheStorage, -} - -/// Sets the keys in Redis shared between the ReadRPC components about the most recent -/// blocks based on finalities (final or optimistic). -/// `final_height` of `optimistic_height` depending on `block_type` passed. -/// Additionally, sets the JSON serialized `StreamerMessage` into keys `final` or `optimistic` -/// accordingly. -impl BlocksByFinalityCache { - // Use redis database 0(default for redis) for handling the blocks by finality cache. - pub async fn new(redis_url: String) -> anyhow::Result<Self> { - Ok(Self { - cache_storage: RedisCacheStorage::new(redis_url, 0).await?, - }) - } - - pub async fn update_block_by_finality( - &self, - finality: near_indexer_primitives::near_primitives::types::Finality, - streamer_message: &near_indexer_primitives::StreamerMessage, - ) -> anyhow::Result<()> { - let block_height = streamer_message.block.header.height; - let block_type = serde_json::to_string(&finality)?; - - let last_height = self - .cache_storage - .get(format!("{}_height", block_type)) - .await - .unwrap_or(0); - - // If the block height is greater than the last height, update the block streamer message - // if we have a few indexers running, we need to make sure that we are not updating the same block - // or block which is already processed or block less than the last processed block - if block_height > last_height { - let json_streamer_message = serde_json::to_string(streamer_message)?; - // Update the last block height - // Create a clone of the redis client and redis cmd to avoid borrowing issues - let update_height_feature = self - .cache_storage - .set(format!("{}_height", block_type), block_height); - - // Update the block streamer message - // Create a clone of the redis client and redis cmd to avoid borrowing issues - let update_stream_msg_feature = - self.cache_storage.set(block_type, json_streamer_message); - - // Wait for both futures to complete - futures::future::join_all([ - update_height_feature.boxed(), - update_stream_msg_feature.boxed(), - ]) - .await - .into_iter() - .collect::<anyhow::Result<()>>()?; - }; - - Ok(()) - } - - pub async fn get_block_by_finality( - &self, - finality: near_indexer_primitives::near_primitives::types::Finality, - ) -> anyhow::Result<near_indexer_primitives::StreamerMessage> { - let block_type = serde_json::to_string(&finality)?; - let resp: String = self.cache_storage.get(block_type).await?; - Ok(serde_json::from_str(&resp)?) - } - - pub async fn update_protocol_version( - &self, - protocol_version: near_primitives::types::ProtocolVersion, - ) -> anyhow::Result<()> { - self.cache_storage - .set("protocol_version", protocol_version) - .await - } - - pub async fn get_protocol_version( - &self, - ) -> anyhow::Result<near_primitives::types::ProtocolVersion> { - self.cache_storage.get("protocol_version").await - } -} - #[derive(Clone)] pub struct TxIndexerCache { cache_storage: RedisCacheStorage, diff --git a/configuration/Cargo.toml b/configuration/Cargo.toml index 6803c0dc..70da9cb4 100644 --- a/configuration/Cargo.toml +++ b/configuration/Cargo.toml @@ -9,9 +9,6 @@ license.workspace = true [dependencies] anyhow = "1.0.70" -aws-credential-types = "1.1.4" -aws-sdk-s3 = { version = "1.14.0", features = ["behavior-version-latest"] } -aws-types = "1.1.4" dotenv = "0.15.0" google-cloud-storage = "0.23.0" lazy_static = "1.4.0" diff --git a/configuration/src/configs/lake.rs b/configuration/src/configs/lake.rs index f8d2335b..caf5de5c 100644 --- a/configuration/src/configs/lake.rs +++ b/configuration/src/configs/lake.rs @@ -1,86 +1,37 @@ -use aws_sdk_s3::config::StalledStreamProtectionConfig; use near_lake_framework::near_indexer_primitives::near_primitives; use serde_derive::Deserialize; -use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic}; +use crate::configs::deserialize_optional_data_or_env; #[derive(Debug, Clone)] pub struct LakeConfig { - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub aws_default_region: String, - pub aws_bucket_name: String, + pub num_threads: Option<u64>, } impl LakeConfig { - pub async fn s3_config(&self) -> aws_sdk_s3::Config { - let credentials = aws_credential_types::Credentials::new( - &self.aws_access_key_id, - &self.aws_secret_access_key, - None, - None, - "", - ); - aws_sdk_s3::Config::builder() - .stalled_stream_protection(StalledStreamProtectionConfig::disabled()) - .credentials_provider(credentials) - .region(aws_types::region::Region::new( - self.aws_default_region.clone(), - )) - .build() - } - pub async fn lake_config( &self, start_block_height: near_primitives::types::BlockHeight, - ) -> anyhow::Result<near_lake_framework::LakeConfig> { - let config_builder = near_lake_framework::LakeConfigBuilder::default(); + ) -> anyhow::Result<near_lake_framework::FastNearConfig> { + let config_builder = near_lake_framework::FastNearConfigBuilder::default(); Ok(config_builder - .s3_config(self.s3_config().await) - .s3_region_name(&self.aws_default_region) - .s3_bucket_name(&self.aws_bucket_name) + .mainnet() .start_block_height(start_block_height) .build() .expect("Failed to build LakeConfig")) } - - pub async fn lake_s3_client(&self) -> near_lake_framework::LakeS3Client { - let s3_config = self.s3_config().await; - near_lake_framework::LakeS3Client::new(aws_sdk_s3::Client::from_conf(s3_config)) - } } #[derive(Deserialize, Debug, Clone, Default)] pub struct CommonLakeConfig { #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_access_key_id: Option<String>, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_secret_access_key: Option<String>, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_default_region: Option<String>, - #[serde(deserialize_with = "deserialize_optional_data_or_env", default)] - pub aws_bucket_name: Option<String>, + pub num_threads: Option<u64>, } impl From<CommonLakeConfig> for LakeConfig { fn from(common_config: CommonLakeConfig) -> Self { Self { - aws_access_key_id: required_value_or_panic( - "aws_access_key_id", - common_config.aws_access_key_id, - ), - aws_secret_access_key: required_value_or_panic( - "aws_secret_access_key", - common_config.aws_secret_access_key, - ), - aws_default_region: required_value_or_panic( - "aws_default_region", - common_config.aws_default_region, - ), - aws_bucket_name: required_value_or_panic( - "aws_bucket_name", - common_config.aws_bucket_name, - ), + num_threads: common_config.num_threads, } } } diff --git a/configuration/src/default_env_configs.rs b/configuration/src/default_env_configs.rs index 469a96de..e3dec048 100644 --- a/configuration/src/default_env_configs.rs +++ b/configuration/src/default_env_configs.rs @@ -120,6 +120,7 @@ tracked_changes = "${TRACKED_CHANGES}" ### Lake framework configuration [lake_config] +<<<<<<< HEAD:configuration/src/default_env_configs.rs ## Lake framework AWS access key id aws_access_key_id = "${AWS_ACCESS_KEY_ID}" @@ -132,6 +133,11 @@ aws_default_region = "${AWS_DEFAULT_REGION}" ## Lake framework bucket name aws_bucket_name = "${AWS_BUCKET_NAME}" +======= +# Number of threads to use for fetching data from fatnear +# Default: 2 * available threads +#num_threads = 8 +>>>>>>> 6909c65 (migrate to fastnear):configuration/example.config.toml ## Transaction details are stored in the Google Cloud Storage [tx_details_storage] diff --git a/near-state-indexer/Cargo.toml b/near-state-indexer/Cargo.toml index 700d3360..185921db 100644 --- a/near-state-indexer/Cargo.toml +++ b/near-state-indexer/Cargo.toml @@ -29,7 +29,6 @@ tokio = { version = "1.36.0", features = [ tokio-stream = "0.1" tracing = "0.1.34" -cache-storage.workspace = true configuration.workspace = true database.workspace = true logic-state-indexer.workspace = true diff --git a/near-state-indexer/src/main.rs b/near-state-indexer/src/main.rs index 3d6c1abe..61228781 100644 --- a/near-state-indexer/src/main.rs +++ b/near-state-indexer/src/main.rs @@ -8,7 +8,6 @@ use logic_state_indexer::{handle_streamer_message, NearClient, INDEXER}; mod configs; mod metrics; mod near_client; -mod utils; #[actix_web::main] async fn main() -> anyhow::Result<()> { @@ -59,12 +58,6 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { let state_indexer_config = configuration::read_configuration::<configuration::NearStateIndexerConfig>().await?; - tracing::info!(target: INDEXER, "Connecting to redis..."); - let finality_blocks_storage = cache_storage::BlocksByFinalityCache::new( - state_indexer_config.general.redis_url.to_string(), - ) - .await?; - tracing::info!(target: INDEXER, "Setup near_indexer..."); let indexer_config = near_indexer::IndexerConfig { home_dir, @@ -80,7 +73,7 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { // Regular indexer process starts here tracing::info!(target: INDEXER, "Instantiating the stream..."); let stream = indexer.streamer(); - let (view_client, client) = indexer.client_actors(); + let (view_client, _) = indexer.client_actors(); let genesis_config = indexer.near_config().genesis.config.clone(); let shard_layout = logic_state_indexer::configs::shard_layout(genesis_config).await?; @@ -99,21 +92,6 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { near_client.clone(), )); - // Initiate the job of updating the optimistic blocks to Redis - tokio::spawn(utils::update_block_in_redis_by_finality( - view_client.clone(), - client.clone(), - finality_blocks_storage.clone(), - near_indexer_primitives::near_primitives::types::Finality::None, - )); - // And the same job for the final blocks - tokio::spawn(utils::update_block_in_redis_by_finality( - view_client.clone(), - client.clone(), - finality_blocks_storage.clone(), - near_indexer_primitives::near_primitives::types::Finality::Final, - )); - // ! Note that the `handle_streamer_message` doesn't interact with the Redis tracing::info!(target: INDEXER, "Starting near_state_indexer..."); let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) diff --git a/near-state-indexer/src/utils.rs b/near-state-indexer/src/utils.rs deleted file mode 100644 index 9d517c99..00000000 --- a/near-state-indexer/src/utils.rs +++ /dev/null @@ -1,110 +0,0 @@ -use near_indexer::near_primitives; -use near_o11y::WithSpanContextExt; - -const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); - -/// The universal function that fetches the block by the given finality. -/// It is used in the `update_block_in_redis_by_finality` function. -/// ! The function does not support the DoomSlug finality. -pub(crate) async fn fetch_block_by_finality( - client: &actix::Addr<near_client::ViewClientActor>, - finality: &near_primitives::types::Finality, -) -> anyhow::Result<near_primitives::views::BlockView> { - let block_reference = near_primitives::types::BlockReference::Finality(finality.clone()); - Ok(client - .send(near_client::GetBlock(block_reference).with_span_context()) - .await??) -} - -pub(crate) async fn fetch_status( - client: &actix::Addr<near_client::ClientActor>, -) -> anyhow::Result<near_primitives::views::StatusResponse> { - tracing::debug!(target: crate::INDEXER, "Fetching status"); - Ok(client - .send( - near_client::Status { - is_health_check: false, - detailed: false, - } - .with_span_context(), - ) - .await??) -} - -/// This function starts a busy-loop that does the similar job to the near-indexer one. -/// However, this one deals with the blocks by provided finality, and instead of streaming them to -/// the client, it stores the block directly to the Redis instance shared between -/// ReadRPC components. -pub async fn update_block_in_redis_by_finality( - view_client: actix::Addr<near_client::ViewClientActor>, - client: actix::Addr<near_client::ClientActor>, - finality_blocks_storage: cache_storage::BlocksByFinalityCache, - finality: near_primitives::types::Finality, -) { - let block_type = serde_json::to_string(&finality).unwrap(); - tracing::info!(target: crate::INDEXER, "Starting [{}] block update job...", block_type); - - let mut last_stored_block_height: Option<near_primitives::types::BlockHeight> = None; - loop { - tokio::time::sleep(INTERVAL).await; - - if let Ok(status) = fetch_status(&client).await { - // Update protocol version in Redis - // This is need for read-rpc to know the current protocol version - if let Err(err) = finality_blocks_storage - .update_protocol_version(status.protocol_version) - .await - { - tracing::error!( - target: crate::INDEXER, - "Failed to update protocol version in Redis: {:?}", err - ); - }; - - // If the node is not fully synced the optimistic blocks are outdated - // and are useless for our case. To avoid any misleading in our Redis - // we don't update blocks until the node is fully synced. - if status.sync_info.syncing { - continue; - } - } - - if let Ok(block) = fetch_block_by_finality(&view_client, &finality).await { - let height = block.header.height; - if let Some(block_height) = last_stored_block_height { - if height <= block_height { - continue; - } else { - last_stored_block_height = Some(height); - } - } else { - last_stored_block_height = Some(height); - }; - let response = near_indexer::build_streamer_message(&view_client, block).await; - match response { - Ok(streamer_message) => { - tracing::debug!(target: crate::INDEXER, "[{}] block {:?}", block_type, last_stored_block_height); - if let Err(err) = finality_blocks_storage - .update_block_by_finality( - near_primitives::types::Finality::None, - &streamer_message, - ) - .await - { - tracing::error!( - target: crate::INDEXER, - "Failed to publish [{}] block streamer message: {:?}", block_type, err - ); - }; - } - Err(err) => { - tracing::error!( - target: crate::INDEXER, - "Missing data, skipping block #{}...", height - ); - tracing::error!(target: crate::INDEXER, "{:#?}", err); - } - } - }; - } -} diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 85cefaef..b465bdce 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,6 +1,7 @@ use std::string::ToString; use futures::executor::block_on; +use near_lake_framework::FastNearClient; use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig}; use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock}; @@ -20,8 +21,7 @@ pub struct GenesisInfo { impl GenesisInfo { pub async fn get( near_rpc_client: &crate::utils::JsonRpcClient, - s3_client: &near_lake_framework::LakeS3Client, - s3_bucket_name: &str, + fastnear_client: &near_lake_framework::FastNearClient, ) -> Self { tracing::info!("Get genesis config..."); let genesis_config = near_rpc_client @@ -31,18 +31,14 @@ impl GenesisInfo { ) .await .expect("Error to get genesis config"); - - let genesis_block = near_lake_framework::s3::fetchers::fetch_block( - s3_client, - s3_bucket_name, - genesis_config.genesis_height, - ) - .await - .expect("Error to get genesis block"); + + let genesis_block = + near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client) + .await; Self { genesis_config, - genesis_block_cache: CacheBlock::from(&genesis_block), + genesis_block_cache: CacheBlock::from(&genesis_block.block), } } } @@ -50,7 +46,7 @@ impl GenesisInfo { #[derive(Clone)] pub struct ServerContext { /// Lake s3 client - pub s3_client: near_lake_framework::LakeS3Client, + pub fastnear_client: near_lake_framework::FastNearClient, /// Database manager pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>, /// TransactionDetails storage @@ -61,8 +57,6 @@ pub struct ServerContext { pub genesis_info: GenesisInfo, /// Near rpc client pub near_rpc_client: crate::utils::JsonRpcClient, - /// AWS s3 lake bucket name - pub s3_bucket_name: String, /// Blocks cache pub blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, /// Final block info include final_block_cache and current_validators_info @@ -109,7 +103,17 @@ impl ServerContext { let blocks_info_by_finality = std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await); - let s3_client = rpc_server_config.lake_config.lake_s3_client().await; + let fastnear_config = rpc_server_config + .lake_config + .lake_config( + blocks_info_by_finality + .optimistic_cache_block() + .await + .block_height, + ) + .await?; + + let fastnear_client = FastNearClient::from_conf(&fastnear_config); let tx_details_storage = tx_details_storage::TxDetailsStorage::new( rpc_server_config.tx_details_storage.storage_client().await, @@ -124,12 +128,7 @@ impl ServerContext { }) .ok(); - let genesis_info = GenesisInfo::get( - &near_rpc_client, - &s3_client, - &rpc_server_config.lake_config.aws_bucket_name, - ) - .await; + let genesis_info = GenesisInfo::get(&near_rpc_client, &fastnear_client).await; let default_epoch_config = EpochConfig::from(&genesis_info.genesis_config); let all_epoch_config = AllEpochConfig::new( @@ -159,13 +158,12 @@ impl ServerContext { .inc(); Ok(Self { - s3_client, + fastnear_client, db_manager: std::sync::Arc::new(Box::new(db_manager)), tx_details_storage: std::sync::Arc::new(tx_details_storage), tx_cache_storage, genesis_info, near_rpc_client, - s3_bucket_name: rpc_server_config.lake_config.aws_bucket_name.clone(), blocks_cache, blocks_info_by_finality, compiled_contract_code_cache, diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 11d869d8..774200de 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -344,63 +344,27 @@ async fn main() -> anyhow::Result<()> { config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?, ); + // Update final block from fastnear let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache); let blocks_info_by_finality_clone = std::sync::Arc::clone(&server_context.blocks_info_by_finality); - let near_rpc_client_clone = near_rpc_client.clone(); - - let finality_blocks_storage = - cache_storage::BlocksByFinalityCache::new(rpc_server_config.general.redis_url.to_string()) - .await - .map_err(|err| { - crate::metrics::OPTIMISTIC_UPDATING.set_not_working(); - tracing::warn!("Failed to connect to Redis: {:?}", err); - }) - .ok(); - - // We need to update final block from Redis and Lake - // Because we can't be sure that Redis has the latest block - // And Lake can be used as a backup source - - // Update final block from Redis if Redis is available - if let Some(finality_blocks_storage) = finality_blocks_storage.clone() { - tokio::spawn(async move { - utils::update_final_block_regularly_from_redis( - blocks_cache_clone, - blocks_info_by_finality_clone, - finality_blocks_storage, - near_rpc_client_clone, - ) - .await - }); - } - - // Update final block from Lake - let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache); - let blocks_info_by_finality_clone = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); + let fastnear_client = server_context.fastnear_client.clone(); tokio::spawn(async move { - utils::update_final_block_regularly_from_lake( + utils::update_final_block_regularly( blocks_cache_clone, blocks_info_by_finality_clone, - rpc_server_config, + fastnear_client, near_rpc_client, ) .await }); - // Update optimistic block from Redis if Redis is available - if let Some(finality_blocks_storage) = finality_blocks_storage { - let blocks_info_by_finality = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); - tokio::spawn(async move { - utils::update_optimistic_block_regularly( - blocks_info_by_finality, - finality_blocks_storage, - ) - .await - }); - } + // Update optimistic block from fastnear + let blocks_info_by_finality = std::sync::Arc::clone(&server_context.blocks_info_by_finality); + let fastnear_client = server_context.fastnear_client.clone(); + tokio::spawn(async move { + utils::update_optimistic_block_regularly(blocks_info_by_finality, fastnear_client).await + }); actix_web::HttpServer::new(move || { let cors = actix_cors::Cors::permissive(); diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 5a7a5dfb..5250758f 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -4,7 +4,7 @@ use near_primitives::views::StateChangeValueView; use crate::config::ServerContext; use crate::modules::blocks::utils::{ - check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_s3, is_matching_change, + check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_fastnear, is_matching_change, }; /// `block` rpc method implementation @@ -411,18 +411,17 @@ pub async fn fetch_block( { data.blocks_info_by_finality.optimistic_block_view().await } else { - near_lake_framework::s3::fetchers::fetch_block( - &data.s3_client, - &data.s3_bucket_name, + near_lake_framework::fastnear::fetchers::fetch_block_or_retry( + &data.fastnear_client, block_height, ) - .await - .map_err(|err| { - tracing::error!("Failed to fetch block from S3: {}", err); - near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { - error_message: format!("BLOCK HEIGHT: {:?}", block_height), - } - })? + .await + .map_err(|err| { + tracing::error!("Failed to fetch block from fastnear: {}", err); + near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { + error_message: format!("BLOCK HEIGHT: {:?}", block_height), + } + })? }; Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view }) } @@ -484,13 +483,8 @@ pub async fn fetch_chunk( ) .map(|block_height_shard_id| (block_height_shard_id.0, block_height_shard_id.1))?, }; - let chunk_view = fetch_chunk_from_s3( - &data.s3_client, - &data.s3_bucket_name, - block_height, - shard_id.into(), - ) - .await?; + let chunk_view = + fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id).await?; // increase block category metrics crate::metrics::increase_request_category_metrics( data, @@ -664,27 +658,16 @@ async fn fetch_shards_by_cache_block( data: &Data<ServerContext>, cache_block: crate::modules::blocks::CacheBlock, ) -> anyhow::Result<Vec<near_indexer_primitives::IndexerShard>> { - let fetch_shards_futures = (0..cache_block.chunks_included) - .collect::<Vec<u64>>() - .into_iter() - .map(|shard_id| { - near_lake_framework::s3::fetchers::fetch_shard( - &data.s3_client, - &data.s3_bucket_name, - cache_block.block_height, - shard_id, - ) - }); - - futures::future::join_all(fetch_shards_futures) - .await - .into_iter() - .collect::<Result<_, _>>() - .map_err(|err| { - anyhow::anyhow!( - "Failed to fetch shards for block {} with error: {}", - cache_block.block_height, - err - ) - }) + match near_lake_framework::fastnear::fetchers::fetch_streamer_message( + &data.fastnear_client, + cache_block.block_height, + ) + .await + { + Some(streamer_message) => Ok(streamer_message.shards), + None => Err(anyhow::anyhow!( + "Failed to fetch shards for block {}", + cache_block.block_height, + )), + } } diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index e4fac836..246eb6c7 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -10,7 +10,6 @@ pub struct CacheBlock { pub block_timestamp: u64, pub gas_price: near_primitives::types::Balance, pub latest_protocol_version: near_primitives::types::ProtocolVersion, - pub chunks_included: u64, pub state_root: near_primitives::hash::CryptoHash, pub epoch_id: near_primitives::hash::CryptoHash, } @@ -61,7 +60,6 @@ impl From<&near_primitives::views::BlockView> for CacheBlock { block_timestamp: block.header.timestamp, gas_price: block.header.gas_price, latest_protocol_version: block.header.latest_protocol_version, - chunks_included: block.header.chunks_included, state_root: block.header.prev_state_root, epoch_id: block.header.epoch_id, } @@ -301,11 +299,13 @@ impl BlocksInfoByFinality { let final_block_future = crate::utils::get_final_block(near_rpc_client, false); let optimistic_block_future = crate::utils::get_final_block(near_rpc_client, true); let validators_future = crate::utils::get_current_validators(near_rpc_client); + let protocol_version_future = crate::utils::get_current_protocol_version(near_rpc_client); - let (final_block, optimistic_block, validators) = futures::try_join!( + let (final_block, optimistic_block, validators, protocol_version) = futures::try_join!( final_block_future, optimistic_block_future, validators_future, + protocol_version_future ) .map_err(|err| { tracing::error!("Error to fetch final block info: {:?}", err); @@ -327,7 +327,7 @@ impl BlocksInfoByFinality { optimistic_changes: futures_locks::RwLock::new(OptimisticChanges::new()), current_validators: futures_locks::RwLock::new(CurrentValidatorInfo { validators }), current_protocol_version: futures_locks::RwLock::new(CurrentProtocolVersion { - protocol_version: near_primitives::version::PROTOCOL_VERSION, + protocol_version, }), } } @@ -359,18 +359,13 @@ impl BlocksInfoByFinality { &self, near_rpc_client: &crate::utils::JsonRpcClient, ) -> anyhow::Result<()> { - self.current_validators.write().await.validators = - crate::utils::get_current_validators(near_rpc_client).await?; - Ok(()) - } - - // Update current protocol version in the cache. - // This method executes when the protocol version changes. - pub async fn update_current_protocol_version( - &self, - protocol_version: near_primitives::types::ProtocolVersion, - ) -> anyhow::Result<()> { - self.current_protocol_version.write().await.protocol_version = protocol_version; + let current_validators_future = crate::utils::get_current_validators(near_rpc_client); + let current_protocol_version_future = + crate::utils::get_current_protocol_version(near_rpc_client); + let (current_validators, current_protocol_version) = + futures::try_join!(current_validators_future, current_protocol_version_future,)?; + self.current_validators.write().await.validators = current_validators; + self.current_protocol_version.write().await.protocol_version = current_protocol_version; Ok(()) } diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 254acc7d..1b8f7164 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -43,11 +43,10 @@ pub async fn check_block_height( #[cfg_attr( feature = "tracing-instrumentation", - tracing::instrument(skip(s3_client)) + tracing::instrument(skip(fastnear_client)) )] -pub async fn fetch_chunk_from_s3( - s3_client: &near_lake_framework::LakeS3Client, - s3_bucket_name: &str, +pub async fn fetch_chunk_from_fastnear( + fastnear_client: &near_lake_framework::FastNearClient, block_height: near_primitives::types::BlockHeight, shard_id: near_primitives::types::ShardId, ) -> Result<near_primitives::views::ChunkView, near_jsonrpc::primitives::types::chunks::RpcChunkError> @@ -57,9 +56,8 @@ pub async fn fetch_chunk_from_s3( block_height, shard_id ); - match near_lake_framework::s3::fetchers::fetch_shard( - s3_client, - s3_bucket_name, + match near_lake_framework::fastnear::fetchers::fetch_shard_or_retry( + fastnear_client, block_height, shard_id.into(), ) diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index c6df311b..ac2ec1ce 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,7 +1,6 @@ use crate::modules::blocks::{BlockInfo, BlocksInfoByFinality, CacheBlock}; #[cfg(feature = "shadow-data-consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; -use futures::StreamExt; #[cfg(feature = "shadow-data-consistency")] const DEFAULT_RETRY_COUNT: u8 = 3; @@ -146,20 +145,27 @@ pub async fn get_current_validators( Ok(near_rpc_client.call(params, None).await?) } +pub async fn get_current_protocol_version( + near_rpc_client: &JsonRpcClient, +) -> anyhow::Result<near_primitives::version::ProtocolVersion> { + let params = near_jsonrpc_client::methods::status::RpcStatusRequest; + Ok(near_rpc_client.call(params, None).await?.protocol_version) +} + async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { - let block = BlockInfo::new_from_streamer_message(streamer_message).await; - let block_cache = block.block_cache; - - if block_cache.block_height as i64 + if streamer_message.block.header.height as i64 > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY .with_label_values(&["final"]) .get() { + let block = BlockInfo::new_from_streamer_message(streamer_message).await; + let block_cache = block.block_cache; + if blocks_info_by_finality.final_cache_block().await.epoch_id != block_cache.epoch_id { tracing::info!("New epoch started: {:?}", block_cache.epoch_id); blocks_info_by_finality @@ -178,142 +184,85 @@ async fn handle_streamer_message( Ok(()) } -pub async fn update_final_block_regularly_from_lake( - blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, - blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, - rpc_server_config: configuration::RpcServerConfig, - near_rpc_client: JsonRpcClient, -) -> anyhow::Result<()> { - tracing::info!("Task to get final block from lake and store in the cache started"); - let lake_config = rpc_server_config - .lake_config - .lake_config( - blocks_info_by_finality - .optimistic_cache_block() - .await - .block_height, - ) - .await?; - let (sender, stream) = near_lake_framework::streamer(lake_config); - let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) - .map(|streamer_message| { - handle_streamer_message( - streamer_message, - std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&blocks_info_by_finality), - &near_rpc_client, - ) - }) - .buffer_unordered(1usize); - - while let Some(_handle_message) = handlers.next().await { - if let Err(err) = _handle_message { - tracing::warn!("{:?}", err); - } - } - drop(handlers); // close the channel so the sender will stop - - // propagate errors from the sender - match sender.await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => Err(e), - Err(e) => Err(anyhow::Error::from(e)), // JoinError - } -} - // Task to get and store final block in the cache -// Subscribe to the redis channel and update the final block in the cache -pub async fn update_final_block_regularly_from_redis( +pub async fn update_final_block_regularly( blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, - finality_blocks_storage: cache_storage::BlocksByFinalityCache, + fastnear_client: near_lake_framework::FastNearClient, near_rpc_client: JsonRpcClient, ) { tracing::info!("Task to get and store final block in the cache started"); - let mut current_protocol_version = blocks_info_by_finality.current_protocol_version().await; + // let mut current_protocol_version = blocks_info_by_finality.current_protocol_version().await; loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - + // TODO: // Update protocol version from redis regularly - match finality_blocks_storage.get_protocol_version().await { - Ok(protocol_version) => { - if protocol_version != current_protocol_version { - if let Err(err) = blocks_info_by_finality - .update_current_protocol_version(protocol_version) - .await - { - tracing::error!("Failed to update protocol version from Redis: {:?}", err); - } else { - // If the protocol version is updated from the Redis, update the local value - // otherwise, we will keep trying to update from the Redis - current_protocol_version = protocol_version; - }; - }; - } - Err(err) => { - tracing::error!("Failed to get protocol version from Redis: {:?}", err); - } - } - - // Update final block from Redis regularly - match finality_blocks_storage - .get_block_by_finality(near_primitives::types::Finality::Final) - .await + // match finality_blocks_storage.get_protocol_version().await { + // Ok(protocol_version) => { + // if protocol_version != current_protocol_version { + // if let Err(err) = blocks_info_by_finality + // .update_current_protocol_version(protocol_version) + // .await + // { + // tracing::error!("Failed to update protocol version from Redis: {:?}", err); + // } else { + // // If the protocol version is updated from the Redis, update the local value + // // otherwise, we will keep trying to update from the Redis + // current_protocol_version = protocol_version; + // }; + // }; + // } + // Err(err) => { + // tracing::error!("Failed to get protocol version from Redis: {:?}", err); + // } + // } + + // Update final block from fastnear regularly + let streamer_message = + near_lake_framework::fastnear::fetchers::fetch_last_block(&fastnear_client) + .await; + if let Err(err) = handle_streamer_message( + streamer_message, + std::sync::Arc::clone(&blocks_cache), + std::sync::Arc::clone(&blocks_info_by_finality), + &near_rpc_client, + ) + .await { - Ok(streamer_message) => { - if let Err(err) = handle_streamer_message( - streamer_message, - std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&blocks_info_by_finality), - &near_rpc_client, - ) - .await - { - tracing::error!("Error to handle_streamer_message: {:?}", err); - } - } - Err(err) => { - tracing::warn!("Error to get final block from redis: {:?}", err); - } - } + tracing::error!("Error to handle_streamer_message: {:?}", err); + }; + // Sleep for 500ms before the next iteration + tokio::time::sleep(std::time::Duration::from_millis(500)).await; } } // Task to get and store optimistic block in the cache -// Subscribe to the redis channel and update the optimistic block in the cache pub async fn update_optimistic_block_regularly( blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, - finality_blocks_storage: cache_storage::BlocksByFinalityCache, + fastnear_client: near_lake_framework::FastNearClient, ) { tracing::info!("Task to get and store optimistic block in the cache started"); loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - match finality_blocks_storage - .get_block_by_finality(near_primitives::types::Finality::None) - .await + let streamer_message = + near_lake_framework::fastnear::fetchers::fetch_optimistic_block( + &fastnear_client, + ) + .await; + if streamer_message.block.header.height as i64 + > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .get() { - Ok(streamer_message) => { - let optimistic_block = BlockInfo::new_from_streamer_message(streamer_message).await; - let optimistic_block_cache = optimistic_block.block_cache; - if optimistic_block_cache.block_height as i64 - > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .get() - { - blocks_info_by_finality - .update_optimistic_block(optimistic_block) - .await; - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .set( - i64::try_from(optimistic_block_cache.block_height) - .expect("Invalid optimistic block height"), - ); - } - } - Err(err) => { - tracing::warn!("Error to get optimistic block from redis: {:?}", err); - } + let optimistic_block = BlockInfo::new_from_streamer_message(streamer_message).await; + let optimistic_block_cache = optimistic_block.block_cache; + blocks_info_by_finality + .update_optimistic_block(optimistic_block) + .await; + crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .set( + i64::try_from(optimistic_block_cache.block_height) + .expect("Invalid optimistic block height"), + ); } // When an optimistic block is not updated, or it is lower than the final block @@ -345,6 +294,8 @@ pub async fn update_optimistic_block_regularly( crate::metrics::OPTIMISTIC_UPDATING.set_working(); tracing::info!("Optimistic block updating is resumed."); }; + // Sleep for 500ms before the next iteration + tokio::time::sleep(std::time::Duration::from_millis(500)).await; } } From f4b425d115cc217ec8239b2dc1976d9b72f7587c Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Wed, 2 Oct 2024 17:41:06 +0300 Subject: [PATCH 2/9] imrovement --- configuration/src/configs/lake.rs | 18 ++++++++++++------ rpc-server/src/config.rs | 3 ++- state-indexer/src/main.rs | 5 ++++- tx-indexer/src/main.rs | 2 +- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/configuration/src/configs/lake.rs b/configuration/src/configs/lake.rs index caf5de5c..ea6c00aa 100644 --- a/configuration/src/configs/lake.rs +++ b/configuration/src/configs/lake.rs @@ -1,8 +1,7 @@ +use crate::configs::deserialize_optional_data_or_env; use near_lake_framework::near_indexer_primitives::near_primitives; use serde_derive::Deserialize; -use crate::configs::deserialize_optional_data_or_env; - #[derive(Debug, Clone)] pub struct LakeConfig { pub num_threads: Option<u64>, @@ -12,13 +11,20 @@ impl LakeConfig { pub async fn lake_config( &self, start_block_height: near_primitives::types::BlockHeight, + chain_id: crate::ChainId, ) -> anyhow::Result<near_lake_framework::FastNearConfig> { - let config_builder = near_lake_framework::FastNearConfigBuilder::default(); + let mut config_builder = near_lake_framework::FastNearConfigBuilder::default(); + match chain_id { + crate::ChainId::Mainnet => config_builder = config_builder.mainnet(), + // Testnet is the default chain for other chain_id + _ => config_builder = config_builder.testnet(), + }; + if let Some(num_threads) = self.num_threads { + config_builder = config_builder.num_threads(num_threads); + }; Ok(config_builder - .mainnet() .start_block_height(start_block_height) - .build() - .expect("Failed to build LakeConfig")) + .build()?) } } diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index b465bdce..b1419f7c 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -45,7 +45,7 @@ impl GenesisInfo { #[derive(Clone)] pub struct ServerContext { - /// Lake s3 client + /// Fastnear client pub fastnear_client: near_lake_framework::FastNearClient, /// Database manager pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>, @@ -110,6 +110,7 @@ impl ServerContext { .optimistic_cache_block() .await .block_height, + rpc_server_config.general.chain_id.clone(), ) .await?; diff --git a/state-indexer/src/main.rs b/state-indexer/src/main.rs index 170ad1e1..879661df 100644 --- a/state-indexer/src/main.rs +++ b/state-indexer/src/main.rs @@ -36,7 +36,10 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let lake_config = indexer_config.lake_config.lake_config(start_block_height).await?; + let lake_config = indexer_config + .lake_config + .lake_config(start_block_height, indexer_config.general.chain_id.clone()) + .await?; let (sender, stream) = near_lake_framework::streamer(lake_config); // Initiate metrics http server diff --git a/tx-indexer/src/main.rs b/tx-indexer/src/main.rs index 4af238b5..98c471f8 100644 --- a/tx-indexer/src/main.rs +++ b/tx-indexer/src/main.rs @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!(target: INDEXER, "Generating LakeConfig..."); let lake_config = indexer_config .lake_config - .lake_config(start_block_height) + .lake_config(start_block_height, indexer_config.general.chain_id.clone()) .await?; tracing::info!(target: INDEXER, "Creating cache storage..."); From 17dfc8ae90c881eb782d481880fa90ef1ac80a91 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Wed, 2 Oct 2024 17:57:48 +0300 Subject: [PATCH 3/9] clear deadcode --- rpc-server/src/utils.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index ac2ec1ce..9aef689e 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -192,30 +192,7 @@ pub async fn update_final_block_regularly( near_rpc_client: JsonRpcClient, ) { tracing::info!("Task to get and store final block in the cache started"); - // let mut current_protocol_version = blocks_info_by_finality.current_protocol_version().await; loop { - // TODO: - // Update protocol version from redis regularly - // match finality_blocks_storage.get_protocol_version().await { - // Ok(protocol_version) => { - // if protocol_version != current_protocol_version { - // if let Err(err) = blocks_info_by_finality - // .update_current_protocol_version(protocol_version) - // .await - // { - // tracing::error!("Failed to update protocol version from Redis: {:?}", err); - // } else { - // // If the protocol version is updated from the Redis, update the local value - // // otherwise, we will keep trying to update from the Redis - // current_protocol_version = protocol_version; - // }; - // }; - // } - // Err(err) => { - // tracing::error!("Failed to get protocol version from Redis: {:?}", err); - // } - // } - // Update final block from fastnear regularly let streamer_message = near_lake_framework::fastnear::fetchers::fetch_last_block(&fastnear_client) From 6b3a733aab6a404c98601a5c85815c62f4bb146d Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Thu, 3 Oct 2024 14:28:57 +0300 Subject: [PATCH 4/9] improvement according github comments --- configuration/src/default_env_configs.rs | 15 --------------- rpc-server/src/modules/blocks/methods.rs | 17 +++++++++-------- rpc-server/src/utils.rs | 6 ++++-- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/configuration/src/default_env_configs.rs b/configuration/src/default_env_configs.rs index e3dec048..0268f72a 100644 --- a/configuration/src/default_env_configs.rs +++ b/configuration/src/default_env_configs.rs @@ -120,24 +120,9 @@ tracked_changes = "${TRACKED_CHANGES}" ### Lake framework configuration [lake_config] -<<<<<<< HEAD:configuration/src/default_env_configs.rs - -## Lake framework AWS access key id -aws_access_key_id = "${AWS_ACCESS_KEY_ID}" - -## Lake framework AWS secret access key -aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}" - -## Lake framework AWS default region -aws_default_region = "${AWS_DEFAULT_REGION}" - -## Lake framework bucket name -aws_bucket_name = "${AWS_BUCKET_NAME}" -======= # Number of threads to use for fetching data from fatnear # Default: 2 * available threads #num_threads = 8 ->>>>>>> 6909c65 (migrate to fastnear):configuration/example.config.toml ## Transaction details are stored in the Google Cloud Storage [tx_details_storage] diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 5250758f..d2148457 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -4,7 +4,8 @@ use near_primitives::views::StateChangeValueView; use crate::config::ServerContext; use crate::modules::blocks::utils::{ - check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_fastnear, is_matching_change, + check_block_height, fetch_block_from_cache_or_get, fetch_chunk_from_fastnear, + is_matching_change, }; /// `block` rpc method implementation @@ -415,13 +416,13 @@ pub async fn fetch_block( &data.fastnear_client, block_height, ) - .await - .map_err(|err| { - tracing::error!("Failed to fetch block from fastnear: {}", err); - near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { - error_message: format!("BLOCK HEIGHT: {:?}", block_height), - } - })? + .await + .map_err(|err| { + tracing::error!("Failed to fetch block from fastnear: {}", err); + near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { + error_message: format!("BLOCK HEIGHT: {:?}", block_height), + } + })? }; Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view }) } diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index 9aef689e..f2da281f 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -149,7 +149,9 @@ pub async fn get_current_protocol_version( near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<near_primitives::version::ProtocolVersion> { let params = near_jsonrpc_client::methods::status::RpcStatusRequest; - Ok(near_rpc_client.call(params, None).await?.protocol_version) + let protocol_version = near_rpc_client.call(params, None).await?.protocol_version; + crate::metrics::CURRENT_PROTOCOL_VERSION.set(protocol_version as i64); + Ok(protocol_version) } async fn handle_streamer_message( @@ -205,7 +207,7 @@ pub async fn update_final_block_regularly( ) .await { - tracing::error!("Error to handle_streamer_message: {:?}", err); + tracing::error!("Error in fn handle_streamer_message(): {:?}", err); }; // Sleep for 500ms before the next iteration tokio::time::sleep(std::time::Duration::from_millis(500)).await; From a93af1bef848bee12d8a4b66ebdabe228d3a1092 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Thu, 10 Oct 2024 18:19:23 +0300 Subject: [PATCH 5/9] reffactoring and improvement --- configuration/src/configs/lake.rs | 12 ++ rpc-server/src/config.rs | 40 +++---- rpc-server/src/main.rs | 50 +++------ rpc-server/src/modules/blocks/mod.rs | 45 +++----- rpc-server/src/utils.rs | 159 +++++++++++++++------------ 5 files changed, 145 insertions(+), 161 deletions(-) diff --git a/configuration/src/configs/lake.rs b/configuration/src/configs/lake.rs index ea6c00aa..a4a74cb0 100644 --- a/configuration/src/configs/lake.rs +++ b/configuration/src/configs/lake.rs @@ -26,6 +26,18 @@ impl LakeConfig { .start_block_height(start_block_height) .build()?) } + + pub async fn lake_client( + &self, + chain_id: crate::ChainId, + ) -> anyhow::Result<near_lake_framework::FastNearClient> { + let fast_near_endpoint = match chain_id { + crate::ChainId::Mainnet => String::from("https://mainnet.neardata.xyz"), + // Testnet is the default chain for other chain_id + _ => String::from("https://testnet.neardata.xyz"), + }; + Ok(near_lake_framework::FastNearClient::new(fast_near_endpoint)) + } } #[derive(Deserialize, Debug, Clone, Default)] diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index b1419f7c..0410a5e1 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -5,6 +5,7 @@ use near_lake_framework::FastNearClient; use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig}; use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock}; +use crate::utils; static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION"); static NEARD_BUILD: &str = env!("BUILD_VERSION"); @@ -83,38 +84,33 @@ pub struct ServerContext { } impl ServerContext { - pub async fn init( - rpc_server_config: configuration::RpcServerConfig, - near_rpc_client: crate::utils::JsonRpcClient, - ) -> anyhow::Result<Self> { + pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result<Self> { let contract_code_cache_size_in_bytes = - crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size) - .await; + utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await; let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new( contract_code_cache_size_in_bytes, )); let block_cache_size_in_bytes = - crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await; + utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await; let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new( block_cache_size_in_bytes, )); - - let blocks_info_by_finality = - std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await); - - let fastnear_config = rpc_server_config - .lake_config - .lake_config( - blocks_info_by_finality - .optimistic_cache_block() - .await - .block_height, - rpc_server_config.general.chain_id.clone(), - ) - .await?; + let near_rpc_client = utils::JsonRpcClient::new( + rpc_server_config.general.near_rpc_url.clone(), + rpc_server_config.general.near_archival_rpc_url.clone(), + ); + // We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance + let near_rpc_client = near_rpc_client.header( + "Referer".to_string(), + rpc_server_config.general.referer_header_value.clone(), + )?; - let fastnear_client = FastNearClient::from_conf(&fastnear_config); + let fastnear_client = rpc_server_config.lake_config.lake_client(rpc_server_config.general.chain_id).await?; + + let blocks_info_by_finality = std::sync::Arc::new( + BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await, + ); let tx_details_storage = tx_details_storage::TxDetailsStorage::new( rpc_server_config.tx_details_storage.storage_client().await, diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 774200de..5928978b 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -328,43 +328,16 @@ async fn main() -> anyhow::Result<()> { let rpc_server_config = configuration::read_configuration::<configuration::RpcServerConfig>().await?; - let near_rpc_client = utils::JsonRpcClient::new( - rpc_server_config.general.near_rpc_url.clone(), - rpc_server_config.general.near_archival_rpc_url.clone(), - ); - // We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance - let near_rpc_client = near_rpc_client.header( - "Referer".to_string(), - rpc_server_config.general.referer_header_value.clone(), - )?; - - let server_port = rpc_server_config.general.server_port; - - let server_context = actix_web::web::Data::new( - config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?, - ); - - // Update final block from fastnear - let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache); - let blocks_info_by_finality_clone = - std::sync::Arc::clone(&server_context.blocks_info_by_finality); - let fastnear_client = server_context.fastnear_client.clone(); - tokio::spawn(async move { - utils::update_final_block_regularly( - blocks_cache_clone, - blocks_info_by_finality_clone, - fastnear_client, - near_rpc_client, - ) - .await - }); + let server_context = + actix_web::web::Data::new(config::ServerContext::init(rpc_server_config.clone()).await?); - // Update optimistic block from fastnear - let blocks_info_by_finality = std::sync::Arc::clone(&server_context.blocks_info_by_finality); - let fastnear_client = server_context.fastnear_client.clone(); - tokio::spawn(async move { - utils::update_optimistic_block_regularly(blocks_info_by_finality, fastnear_client).await - }); + utils::task_regularly_update_blocks_by_finality( + std::sync::Arc::clone(&server_context.blocks_info_by_finality), + std::sync::Arc::clone(&server_context.blocks_cache), + server_context.fastnear_client.clone(), + server_context.near_rpc_client.clone(), + ) + .await; actix_web::HttpServer::new(move || { let cors = actix_cors::Cors::permissive(); @@ -377,7 +350,10 @@ async fn main() -> anyhow::Result<()> { .service(metrics::get_metrics) .service(health::get_health_status) }) - .bind(format!("0.0.0.0:{:0>5}", server_port))? + .bind(format!( + "0.0.0.0:{:0>5}", + rpc_server_config.general.server_port + ))? .run() .await?; diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 246eb6c7..7ba35bde 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -74,15 +74,6 @@ pub struct BlockInfo { } impl BlockInfo { - // Create new BlockInfo from BlockView. this method is useful only for start rpc-server. - pub async fn new_from_block_view(block_view: near_primitives::views::BlockView) -> Self { - Self { - block_cache: CacheBlock::from(&block_view), - block_view, - changes: vec![], // We left changes empty because block_view doesn't contain state changes. - } - } - // Create new BlockInfo from StreamerMessage. // This is using to update final and optimistic blocks regularly. pub async fn new_from_streamer_message( @@ -294,35 +285,25 @@ pub struct BlocksInfoByFinality { impl BlocksInfoByFinality { pub async fn new( near_rpc_client: &crate::utils::JsonRpcClient, - blocks_cache: &std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, + fast_near_client: &near_lake_framework::FastNearClient, ) -> Self { - let final_block_future = crate::utils::get_final_block(near_rpc_client, false); - let optimistic_block_future = crate::utils::get_final_block(near_rpc_client, true); - let validators_future = crate::utils::get_current_validators(near_rpc_client); - let protocol_version_future = crate::utils::get_current_protocol_version(near_rpc_client); - - let (final_block, optimistic_block, validators, protocol_version) = futures::try_join!( - final_block_future, - optimistic_block_future, - validators_future, - protocol_version_future - ) - .map_err(|err| { - tracing::error!("Error to fetch final block info: {:?}", err); - err - }) - .expect("Error to get final block info"); - - blocks_cache - .put(final_block.header.height, CacheBlock::from(&final_block)) - .await; + let final_block = + near_lake_framework::fastnear::fetchers::fetch_last_block(fast_near_client).await; + let optimistic_block = + near_lake_framework::fastnear::fetchers::fetch_optimistic_block(fast_near_client).await; + let validators = crate::utils::get_current_validators(near_rpc_client) + .await + .expect("Failed to get current validators"); + let protocol_version = crate::utils::get_current_protocol_version(near_rpc_client) + .await + .expect("Failed to get current protocol version"); Self { final_block: futures_locks::RwLock::new( - BlockInfo::new_from_block_view(final_block).await, + BlockInfo::new_from_streamer_message(final_block).await, ), optimistic_block: futures_locks::RwLock::new( - BlockInfo::new_from_block_view(optimistic_block).await, + BlockInfo::new_from_streamer_message(optimistic_block).await, ), optimistic_changes: futures_locks::RwLock::new(OptimisticChanges::new()), current_validators: futures_locks::RwLock::new(CurrentValidatorInfo { validators }), diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index f2da281f..eb25248b 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -105,37 +105,6 @@ impl JsonRpcClient { } } -pub async fn get_final_block( - near_rpc_client: &JsonRpcClient, - optimistic: bool, -) -> anyhow::Result<near_primitives::views::BlockView> { - let block_request_method = near_jsonrpc_client::methods::block::RpcBlockRequest { - block_reference: near_primitives::types::BlockReference::Finality(if optimistic { - near_primitives::types::Finality::None - } else { - near_primitives::types::Finality::Final - }), - }; - let block_view = near_rpc_client.call(block_request_method, None).await?; - - // Updating the metric to expose the block height considered as final by the server - // this metric can be used to calculate the lag between the server and the network - // Prometheus Gauge Metric type do not support u64 - // https://github.com/tikv/rust-prometheus/issues/470 - if optimistic { - // optimistic block height - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .set(i64::try_from(block_view.header.height)?); - } else { - // final block height - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["final"]) - .set(i64::try_from(block_view.header.height)?); - } - Ok(block_view) -} - pub async fn get_current_validators( near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<near_primitives::views::EpochValidatorInfo> { @@ -150,7 +119,6 @@ pub async fn get_current_protocol_version( ) -> anyhow::Result<near_primitives::version::ProtocolVersion> { let params = near_jsonrpc_client::methods::status::RpcStatusRequest; let protocol_version = near_rpc_client.call(params, None).await?.protocol_version; - crate::metrics::CURRENT_PROTOCOL_VERSION.set(protocol_version as i64); Ok(protocol_version) } @@ -187,63 +155,87 @@ async fn handle_streamer_message( } // Task to get and store final block in the cache -pub async fn update_final_block_regularly( +async fn task_update_final_block_regularly( blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, fastnear_client: near_lake_framework::FastNearClient, near_rpc_client: JsonRpcClient, ) { tracing::info!("Task to get and store final block in the cache started"); - loop { - // Update final block from fastnear regularly - let streamer_message = - near_lake_framework::fastnear::fetchers::fetch_last_block(&fastnear_client) - .await; - if let Err(err) = handle_streamer_message( - streamer_message, - std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&blocks_info_by_finality), - &near_rpc_client, - ) + let mut final_block_height = blocks_info_by_finality + .final_cache_block() .await + .block_height; + loop { + final_block_height += 1; + if let Some(streamer_message) = + near_lake_framework::fastnear::fetchers::fetch_streamer_message( + &fastnear_client, + final_block_height, + ) + .await { - tracing::error!("Error in fn handle_streamer_message(): {:?}", err); - }; - // Sleep for 500ms before the next iteration - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + if let Err(err) = handle_streamer_message( + streamer_message, + std::sync::Arc::clone(&blocks_cache), + std::sync::Arc::clone(&blocks_info_by_finality), + &near_rpc_client, + ) + .await + { + tracing::error!("Error in fn handle_streamer_message(): {:?}", err); + }; + } } } // Task to get and store optimistic block in the cache -pub async fn update_optimistic_block_regularly( +async fn task_update_optimistic_block_regularly( blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, fastnear_client: near_lake_framework::FastNearClient, ) { tracing::info!("Task to get and store optimistic block in the cache started"); + let mut optimistic_block_height = blocks_info_by_finality + .optimistic_cache_block() + .await + .block_height; loop { - let streamer_message = - near_lake_framework::fastnear::fetchers::fetch_optimistic_block( + optimistic_block_height += 1; + if let Some(streamer_message) = + near_lake_framework::fastnear::fetchers::fetch_optimistic_block_by_height( &fastnear_client, + optimistic_block_height, ) - .await; - if streamer_message.block.header.height as i64 - > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .get() + .await { - let optimistic_block = BlockInfo::new_from_streamer_message(streamer_message).await; - let optimistic_block_cache = optimistic_block.block_cache; - blocks_info_by_finality - .update_optimistic_block(optimistic_block) - .await; - crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY - .with_label_values(&["optimistic"]) - .set( - i64::try_from(optimistic_block_cache.block_height) - .expect("Invalid optimistic block height"), - ); + if streamer_message.block.header.height as i64 + > crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .get() + { + let optimistic_block = BlockInfo::new_from_streamer_message(streamer_message).await; + let optimistic_block_cache = optimistic_block.block_cache; + blocks_info_by_finality + .update_optimistic_block(optimistic_block) + .await; + crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY + .with_label_values(&["optimistic"]) + .set( + i64::try_from(optimistic_block_cache.block_height) + .expect("Invalid optimistic block height"), + ); + } } + } +} +// Task to check optimistic block status +async fn task_optimistic_block_status() { + tracing::info!("Task to check optimistic block status started"); + loop { + // check every 2 seconds + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + // When an optimistic block is not updated, or it is lower than the final block // we need to mark that optimistic updating is not working if crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY @@ -272,12 +264,39 @@ pub async fn update_optimistic_block_regularly( { crate::metrics::OPTIMISTIC_UPDATING.set_working(); tracing::info!("Optimistic block updating is resumed."); - }; - // Sleep for 500ms before the next iteration - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } } } +pub async fn task_regularly_update_blocks_by_finality( + blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>, + blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>, + fastnear_client: near_lake_framework::FastNearClient, + near_rpc_client: JsonRpcClient, +) { + // Task update final block regularly + let blocks_info_by_finality_clone = std::sync::Arc::clone(&blocks_info_by_finality); + let fastnear_client_clone = fastnear_client.clone(); + + tokio::spawn(async move { + task_update_final_block_regularly( + blocks_cache, + blocks_info_by_finality_clone, + fastnear_client_clone, + near_rpc_client, + ) + .await + }); + + // Task update optimistic block regularly + tokio::spawn(async move { + task_update_optimistic_block_regularly(blocks_info_by_finality, fastnear_client).await + }); + + // Task to check the optimistic block status + tokio::spawn(async move { task_optimistic_block_status().await }); +} + /// Convert gigabytes to bytes pub(crate) async fn gigabytes_to_bytes(gigabytes: f64) -> usize { (gigabytes * 1024.0 * 1024.0 * 1024.0) as usize From f81ab12bbcd977e44c84e404b9935f2907d87a82 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Thu, 10 Oct 2024 18:29:32 +0300 Subject: [PATCH 6/9] fmt --- rpc-server/src/utils.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index eb25248b..b2cc32ab 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -233,9 +233,9 @@ async fn task_update_optimistic_block_regularly( async fn task_optimistic_block_status() { tracing::info!("Task to check optimistic block status started"); loop { - // check every 2 seconds - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - + // check every second + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // When an optimistic block is not updated, or it is lower than the final block // we need to mark that optimistic updating is not working if crate::metrics::LATEST_BLOCK_HEIGHT_BY_FINALITIY From cd814ccc762cce0fd95069d534edbd1a2f6aa241 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Thu, 7 Nov 2024 14:44:52 +0200 Subject: [PATCH 7/9] clippy --- rpc-server/src/config.rs | 13 +++++++------ rpc-server/src/modules/blocks/methods.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 0410a5e1..aaa7c2f8 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,7 +1,6 @@ use std::string::ToString; use futures::executor::block_on; -use near_lake_framework::FastNearClient; use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig}; use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock}; @@ -32,10 +31,9 @@ impl GenesisInfo { ) .await .expect("Error to get genesis config"); - + let genesis_block = - near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client) - .await; + near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client).await; Self { genesis_config, @@ -105,8 +103,11 @@ impl ServerContext { "Referer".to_string(), rpc_server_config.general.referer_header_value.clone(), )?; - - let fastnear_client = rpc_server_config.lake_config.lake_client(rpc_server_config.general.chain_id).await?; + + let fastnear_client = rpc_server_config + .lake_config + .lake_client(rpc_server_config.general.chain_id) + .await?; let blocks_info_by_finality = std::sync::Arc::new( BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await, diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index d2148457..99b73502 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -485,7 +485,7 @@ pub async fn fetch_chunk( .map(|block_height_shard_id| (block_height_shard_id.0, block_height_shard_id.1))?, }; let chunk_view = - fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id).await?; + fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?; // increase block category metrics crate::metrics::increase_request_category_metrics( data, From 9e4a2d57201692aad3e9dd56dd1c745b32394e3d Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Tue, 24 Dec 2024 13:23:21 +0200 Subject: [PATCH 8/9] add requests_blocks_counters metrics --- CHANGELOG.md | 4 +++ rpc-server/src/metrics.rs | 5 +++ rpc-server/src/modules/blocks/methods.rs | 40 +++++++++++++++++++++--- rpc-server/src/modules/blocks/utils.rs | 7 ++++- 4 files changed, 51 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ebd1936..4fb67f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/read-rpc/compare/main...develop) +### What's Changed +* Migrate from lake data to fastnear data +* Add metrics to calculate the number of blocks which fetched from the cache and fastnear + ## [0.3.3](https://github.com/near/read-rpc/releases/tag/v0.3.3) ### Supported Nearcore Version diff --git a/rpc-server/src/metrics.rs b/rpc-server/src/metrics.rs index 5af0d36e..08359e48 100644 --- a/rpc-server/src/metrics.rs +++ b/rpc-server/src/metrics.rs @@ -120,6 +120,11 @@ lazy_static! { prometheus::register(Box::new(counter_vec.clone())).unwrap(); counter_vec }; + pub(crate) static ref REQUESTS_BLOCKS_COUNTERS: IntCounterVec = register_int_counter_vec( + "requests_blocks_counters", + "Total number of requests blocks from Lake and Cache", + &["method_name", "source"] // // This declares a label named `method_name` and `source`(lake or cache) + ).unwrap(); // Error metrics // 0: ReadRPC success, NEAR RPC success" diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 99b73502..76ed0ad7 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -261,7 +261,13 @@ async fn changes_in_block_call( } })?; - let result = fetch_changes_in_block(&data, cache_block, ¶ms.block_reference).await; + let result = fetch_changes_in_block( + &data, + cache_block, + ¶ms.block_reference, + "EXPERIMENTAL_changes_in_block", + ) + .await; #[cfg(feature = "shadow-data-consistency")] { if let near_primitives::types::BlockReference::Finality(_) = params.block_reference { @@ -306,6 +312,7 @@ async fn changes_in_block_by_type_call( cache_block, ¶ms.state_changes_request, ¶ms.block_reference, + "EXPERIMENTAL_changes", ) .await; @@ -367,6 +374,9 @@ pub async fn fetch_block( return match finality { near_primitives::types::Finality::Final | near_primitives::types::Finality::DoomSlug => { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); let block_view = data.blocks_info_by_finality.final_block_view().await; Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view }) } @@ -379,6 +389,9 @@ pub async fn fetch_block( }, ) } else { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); let block_view = data.blocks_info_by_finality.optimistic_block_view().await; Ok( near_jsonrpc::primitives::types::blocks::RpcBlockResponse { @@ -402,6 +415,9 @@ pub async fn fetch_block( .await .block_height { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); data.blocks_info_by_finality.final_block_view().await } else if block_height == data @@ -410,8 +426,14 @@ pub async fn fetch_block( .await .block_height { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); data.blocks_info_by_finality.optimistic_block_view().await } else { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); near_lake_framework::fastnear::fetchers::fetch_block_or_retry( &data.fastnear_client, block_height, @@ -484,6 +506,9 @@ pub async fn fetch_chunk( ) .map(|block_height_shard_id| (block_height_shard_id.0, block_height_shard_id.1))?, }; + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); let chunk_view = fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?; // increase block category metrics @@ -505,11 +530,12 @@ async fn fetch_changes_in_block( data: &Data<ServerContext>, cache_block: crate::modules::blocks::CacheBlock, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> Result< near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockByTypeResponse, near_jsonrpc::primitives::types::changes::RpcStateChangesError, > { - let trie_keys = fetch_state_changes(data, cache_block, block_reference) + let trie_keys = fetch_state_changes(data, cache_block, block_reference, method_name) .await .map_err(|err| { near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock { @@ -593,11 +619,12 @@ async fn fetch_changes_in_block_by_type( cache_block: crate::modules::blocks::CacheBlock, state_changes_request: &near_primitives::views::StateChangesRequestView, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> Result< near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockResponse, near_jsonrpc::primitives::types::changes::RpcStateChangesError, > { - let changes = fetch_state_changes(data, cache_block, block_reference) + let changes = fetch_state_changes(data, cache_block, block_reference, method_name) .await .map_err(|err| { near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock { @@ -624,6 +651,7 @@ async fn fetch_state_changes( data: &Data<ServerContext>, cache_block: crate::modules::blocks::CacheBlock, block_reference: &near_primitives::types::BlockReference, + method_name: &str, ) -> anyhow::Result<near_primitives::views::StateChangesView> { if let near_primitives::types::BlockReference::Finality(finality) = block_reference { match finality { @@ -645,7 +673,7 @@ async fn fetch_state_changes( } } } else { - Ok(fetch_shards_by_cache_block(data, cache_block) + Ok(fetch_shards_by_cache_block(data, cache_block, method_name) .await? .into_iter() .flat_map(|shard| shard.state_changes) @@ -658,7 +686,11 @@ async fn fetch_state_changes( async fn fetch_shards_by_cache_block( data: &Data<ServerContext>, cache_block: crate::modules::blocks::CacheBlock, + method_name: &str, ) -> anyhow::Result<Vec<near_indexer_primitives::IndexerShard>> { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "lake"]) + .inc(); match near_lake_framework::fastnear::fetchers::fetch_streamer_message( &data.fastnear_client, cache_block.block_height, diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 1b8f7164..0115c901 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -162,7 +162,12 @@ pub async fn fetch_block_from_cache_or_get( } }; let cache_block = match block { - Some(block) => block, + Some(block) => { + crate::metrics::REQUESTS_BLOCKS_COUNTERS + .with_label_values(&[method_name, "cache"]) + .inc(); + block + } None => { let block_from_s3 = fetch_block(data, block_reference, method_name).await?; let block = CacheBlock::from(&block_from_s3.block_view); From 4ebd3c20091c0f072cd37ced22f8860b3a85df67 Mon Sep 17 00:00:00 2001 From: Yurii Koba <kobayurii88@gmail.com> Date: Tue, 24 Dec 2024 13:57:52 +0200 Subject: [PATCH 9/9] revert deeted changes for near state indexer --- Cargo.lock | 1 + cache-storage/src/lib.rs | 87 +++++++++++++++++++++++++ near-state-indexer/Cargo.toml | 1 + near-state-indexer/src/main.rs | 24 ++++++- near-state-indexer/src/utils.rs | 110 ++++++++++++++++++++++++++++++++ rpc-server/src/metrics.rs | 2 +- 6 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 near-state-indexer/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 99b4176e..64e5eab9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5105,6 +5105,7 @@ dependencies = [ "actix", "actix-web", "anyhow", + "cache-storage", "clap", "configuration", "database", diff --git a/cache-storage/src/lib.rs b/cache-storage/src/lib.rs index a41fb0bd..c9e18964 100644 --- a/cache-storage/src/lib.rs +++ b/cache-storage/src/lib.rs @@ -1,4 +1,5 @@ use futures::FutureExt; +use near_indexer_primitives::near_primitives; mod utils; @@ -114,6 +115,92 @@ impl RedisCacheStorage { } } +#[derive(Clone)] +pub struct BlocksByFinalityCache { + cache_storage: RedisCacheStorage, +} + +/// Sets the keys in Redis shared between the ReadRPC components about the most recent +/// blocks based on finalities (final or optimistic). +/// `final_height` of `optimistic_height` depending on `block_type` passed. +/// Additionally, sets the JSON serialized `StreamerMessage` into keys `final` or `optimistic` +/// accordingly. +impl BlocksByFinalityCache { + // Use redis database 0(default for redis) for handling the blocks by finality cache. + pub async fn new(redis_url: String) -> anyhow::Result<Self> { + Ok(Self { + cache_storage: RedisCacheStorage::new(redis_url, 0).await?, + }) + } + + pub async fn update_block_by_finality( + &self, + finality: near_indexer_primitives::near_primitives::types::Finality, + streamer_message: &near_indexer_primitives::StreamerMessage, + ) -> anyhow::Result<()> { + let block_height = streamer_message.block.header.height; + let block_type = serde_json::to_string(&finality)?; + + let last_height = self + .cache_storage + .get(format!("{}_height", block_type)) + .await + .unwrap_or(0); + + // If the block height is greater than the last height, update the block streamer message + // if we have a few indexers running, we need to make sure that we are not updating the same block + // or block which is already processed or block less than the last processed block + if block_height > last_height { + let json_streamer_message = serde_json::to_string(streamer_message)?; + // Update the last block height + // Create a clone of the redis client and redis cmd to avoid borrowing issues + let update_height_feature = self + .cache_storage + .set(format!("{}_height", block_type), block_height); + + // Update the block streamer message + // Create a clone of the redis client and redis cmd to avoid borrowing issues + let update_stream_msg_feature = + self.cache_storage.set(block_type, json_streamer_message); + + // Wait for both futures to complete + futures::future::join_all([ + update_height_feature.boxed(), + update_stream_msg_feature.boxed(), + ]) + .await + .into_iter() + .collect::<anyhow::Result<()>>()?; + }; + + Ok(()) + } + + pub async fn get_block_by_finality( + &self, + finality: near_indexer_primitives::near_primitives::types::Finality, + ) -> anyhow::Result<near_indexer_primitives::StreamerMessage> { + let block_type = serde_json::to_string(&finality)?; + let resp: String = self.cache_storage.get(block_type).await?; + Ok(serde_json::from_str(&resp)?) + } + + pub async fn update_protocol_version( + &self, + protocol_version: near_primitives::types::ProtocolVersion, + ) -> anyhow::Result<()> { + self.cache_storage + .set("protocol_version", protocol_version) + .await + } + + pub async fn get_protocol_version( + &self, + ) -> anyhow::Result<near_primitives::types::ProtocolVersion> { + self.cache_storage.get("protocol_version").await + } +} + #[derive(Clone)] pub struct TxIndexerCache { cache_storage: RedisCacheStorage, diff --git a/near-state-indexer/Cargo.toml b/near-state-indexer/Cargo.toml index 185921db..700d3360 100644 --- a/near-state-indexer/Cargo.toml +++ b/near-state-indexer/Cargo.toml @@ -29,6 +29,7 @@ tokio = { version = "1.36.0", features = [ tokio-stream = "0.1" tracing = "0.1.34" +cache-storage.workspace = true configuration.workspace = true database.workspace = true logic-state-indexer.workspace = true diff --git a/near-state-indexer/src/main.rs b/near-state-indexer/src/main.rs index 61228781..3d6c1abe 100644 --- a/near-state-indexer/src/main.rs +++ b/near-state-indexer/src/main.rs @@ -8,6 +8,7 @@ use logic_state_indexer::{handle_streamer_message, NearClient, INDEXER}; mod configs; mod metrics; mod near_client; +mod utils; #[actix_web::main] async fn main() -> anyhow::Result<()> { @@ -58,6 +59,12 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { let state_indexer_config = configuration::read_configuration::<configuration::NearStateIndexerConfig>().await?; + tracing::info!(target: INDEXER, "Connecting to redis..."); + let finality_blocks_storage = cache_storage::BlocksByFinalityCache::new( + state_indexer_config.general.redis_url.to_string(), + ) + .await?; + tracing::info!(target: INDEXER, "Setup near_indexer..."); let indexer_config = near_indexer::IndexerConfig { home_dir, @@ -73,7 +80,7 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { // Regular indexer process starts here tracing::info!(target: INDEXER, "Instantiating the stream..."); let stream = indexer.streamer(); - let (view_client, _) = indexer.client_actors(); + let (view_client, client) = indexer.client_actors(); let genesis_config = indexer.near_config().genesis.config.clone(); let shard_layout = logic_state_indexer::configs::shard_layout(genesis_config).await?; @@ -92,6 +99,21 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> { near_client.clone(), )); + // Initiate the job of updating the optimistic blocks to Redis + tokio::spawn(utils::update_block_in_redis_by_finality( + view_client.clone(), + client.clone(), + finality_blocks_storage.clone(), + near_indexer_primitives::near_primitives::types::Finality::None, + )); + // And the same job for the final blocks + tokio::spawn(utils::update_block_in_redis_by_finality( + view_client.clone(), + client.clone(), + finality_blocks_storage.clone(), + near_indexer_primitives::near_primitives::types::Finality::Final, + )); + // ! Note that the `handle_streamer_message` doesn't interact with the Redis tracing::info!(target: INDEXER, "Starting near_state_indexer..."); let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) diff --git a/near-state-indexer/src/utils.rs b/near-state-indexer/src/utils.rs new file mode 100644 index 00000000..9d517c99 --- /dev/null +++ b/near-state-indexer/src/utils.rs @@ -0,0 +1,110 @@ +use near_indexer::near_primitives; +use near_o11y::WithSpanContextExt; + +const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); + +/// The universal function that fetches the block by the given finality. +/// It is used in the `update_block_in_redis_by_finality` function. +/// ! The function does not support the DoomSlug finality. +pub(crate) async fn fetch_block_by_finality( + client: &actix::Addr<near_client::ViewClientActor>, + finality: &near_primitives::types::Finality, +) -> anyhow::Result<near_primitives::views::BlockView> { + let block_reference = near_primitives::types::BlockReference::Finality(finality.clone()); + Ok(client + .send(near_client::GetBlock(block_reference).with_span_context()) + .await??) +} + +pub(crate) async fn fetch_status( + client: &actix::Addr<near_client::ClientActor>, +) -> anyhow::Result<near_primitives::views::StatusResponse> { + tracing::debug!(target: crate::INDEXER, "Fetching status"); + Ok(client + .send( + near_client::Status { + is_health_check: false, + detailed: false, + } + .with_span_context(), + ) + .await??) +} + +/// This function starts a busy-loop that does the similar job to the near-indexer one. +/// However, this one deals with the blocks by provided finality, and instead of streaming them to +/// the client, it stores the block directly to the Redis instance shared between +/// ReadRPC components. +pub async fn update_block_in_redis_by_finality( + view_client: actix::Addr<near_client::ViewClientActor>, + client: actix::Addr<near_client::ClientActor>, + finality_blocks_storage: cache_storage::BlocksByFinalityCache, + finality: near_primitives::types::Finality, +) { + let block_type = serde_json::to_string(&finality).unwrap(); + tracing::info!(target: crate::INDEXER, "Starting [{}] block update job...", block_type); + + let mut last_stored_block_height: Option<near_primitives::types::BlockHeight> = None; + loop { + tokio::time::sleep(INTERVAL).await; + + if let Ok(status) = fetch_status(&client).await { + // Update protocol version in Redis + // This is need for read-rpc to know the current protocol version + if let Err(err) = finality_blocks_storage + .update_protocol_version(status.protocol_version) + .await + { + tracing::error!( + target: crate::INDEXER, + "Failed to update protocol version in Redis: {:?}", err + ); + }; + + // If the node is not fully synced the optimistic blocks are outdated + // and are useless for our case. To avoid any misleading in our Redis + // we don't update blocks until the node is fully synced. + if status.sync_info.syncing { + continue; + } + } + + if let Ok(block) = fetch_block_by_finality(&view_client, &finality).await { + let height = block.header.height; + if let Some(block_height) = last_stored_block_height { + if height <= block_height { + continue; + } else { + last_stored_block_height = Some(height); + } + } else { + last_stored_block_height = Some(height); + }; + let response = near_indexer::build_streamer_message(&view_client, block).await; + match response { + Ok(streamer_message) => { + tracing::debug!(target: crate::INDEXER, "[{}] block {:?}", block_type, last_stored_block_height); + if let Err(err) = finality_blocks_storage + .update_block_by_finality( + near_primitives::types::Finality::None, + &streamer_message, + ) + .await + { + tracing::error!( + target: crate::INDEXER, + "Failed to publish [{}] block streamer message: {:?}", block_type, err + ); + }; + } + Err(err) => { + tracing::error!( + target: crate::INDEXER, + "Missing data, skipping block #{}...", height + ); + tracing::error!(target: crate::INDEXER, "{:#?}", err); + } + } + }; + } +} diff --git a/rpc-server/src/metrics.rs b/rpc-server/src/metrics.rs index 08359e48..e649489b 100644 --- a/rpc-server/src/metrics.rs +++ b/rpc-server/src/metrics.rs @@ -123,7 +123,7 @@ lazy_static! { pub(crate) static ref REQUESTS_BLOCKS_COUNTERS: IntCounterVec = register_int_counter_vec( "requests_blocks_counters", "Total number of requests blocks from Lake and Cache", - &["method_name", "source"] // // This declares a label named `method_name` and `source`(lake or cache) + &["method_name", "source"] // This declares a label named `method_name` and `source`(lake or cache) ).unwrap(); // Error metrics