From a01006778c7ed8dfb8835dc7d36c6a6ef361361f Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 10 Oct 2024 15:49:05 +0300 Subject: [PATCH] reffactoring --- src/lib.rs | 21 ++-- src/providers/fastnear/client.rs | 9 +- src/providers/fastnear/fetchers.rs | 15 +++ src/providers/fastnear/mod.rs | 4 +- src/providers/fastnear/types.rs | 6 -- src/providers/s3/client.rs | 148 +++++++++++++++++++++++++++ src/providers/s3/fetchers.rs | 158 +---------------------------- src/providers/s3/mod.rs | 4 +- 8 files changed, 192 insertions(+), 173 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ff46685..bf246a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,11 +280,17 @@ extern crate derive_builder; pub use near_indexer_primitives; pub use aws_credential_types::Credentials; -pub use providers::fastnear::client as fastnear_client; + +mod providers; + +pub use providers::fastnear; +pub use providers::s3; + +pub use providers::fastnear::client::FastNearClient; pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; -pub use providers::s3::client as s3_client; + +pub use providers::s3::client::LakeS3Client; pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; -pub mod providers; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; @@ -317,11 +323,10 @@ pub fn streamer>( let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); match config { providers::NearLakeFrameworkConfig::Lake(config) => { - (tokio::spawn(providers::s3::start(sender, config)), receiver) + (tokio::spawn(s3::start(sender, config)), receiver) + } + providers::NearLakeFrameworkConfig::FastNear(config) => { + (tokio::spawn(fastnear::start(sender, config)), receiver) } - providers::NearLakeFrameworkConfig::FastNear(config) => ( - tokio::spawn(providers::fastnear::start(sender, config)), - receiver, - ), } } diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs index 1d25d37..daba3ef 100644 --- a/src/providers/fastnear/client.rs +++ b/src/providers/fastnear/client.rs @@ -4,8 +4,8 @@ use super::types; /// It is used to fetch the blocks from the FastNear #[derive(Clone, Debug)] pub struct FastNearClient { - client: reqwest::Client, endpoint: String, + client: reqwest::Client, } impl FastNearClient { @@ -16,6 +16,13 @@ impl FastNearClient { } } + pub fn from_conf(config: &types::FastNearConfig) -> Self { + Self { + endpoint: config.endpoint.clone(), + client: reqwest::Client::new(), + } + } + /// Fetches the block from the FastNear API /// Returns the result in `Option` /// If the block does not exist, returns `None` diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs index 25fb813..62209f0 100644 --- a/src/providers/fastnear/fetchers.rs +++ b/src/providers/fastnear/fetchers.rs @@ -21,6 +21,21 @@ pub async fn fetch_optimistic_block( .expect("Failed to fetch optimistic block") } +/// Fetches the optimistic block from the fastenar +/// This function is used to fetch the optimistic block by height +/// This function will be using endpoint `/v0/block_opt/:block_height` +/// This would be waiting some time until the optimistic block is available +/// Returns `near_indexer_primitives::StreamerMessage` if the block is available +/// Returns `None` if the block height is skipped +pub async fn fetch_optimistic_block_by_height( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Option { + client + .fetch_until_success(&format!("/v0/block_opt/{}", block_height)) + .await +} + /// Fetches the genesis block from the fastenar /// Returns `near_indexer_primitives::StreamerMessage` pub async fn fetch_first_block( diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 234c2e5..a201f92 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -1,3 +1,5 @@ +use crate::FastNearClient; + pub mod client; pub mod fetchers; pub mod types; @@ -12,7 +14,7 @@ pub async fn start( blocks_sink: tokio::sync::mpsc::Sender, config: types::FastNearConfig, ) -> anyhow::Result<()> { - let client = config.client(); + let client = FastNearClient::from_conf(&config); let max_num_threads = config.num_threads; let next_sink_block = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index 3dcc47c..eafc7ab 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -31,12 +31,6 @@ pub struct FastNearConfig { pub(crate) blocks_preload_pool_size: usize, } -impl FastNearConfig { - pub fn client(&self) -> crate::fastnear_client::FastNearClient { - crate::fastnear_client::FastNearClient::new(self.endpoint.clone()) - } -} - impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] for mainnet /// ``` diff --git a/src/providers/s3/client.rs b/src/providers/s3/client.rs index 6d718b4..70cf91e 100644 --- a/src/providers/s3/client.rs +++ b/src/providers/s3/client.rs @@ -80,3 +80,151 @@ pub trait S3Client: Send + Sync { start_after_prefix: &str, ) -> Result, ListCommonPrefixesError>; } + +#[derive(Clone, Debug)] +pub struct LakeS3Client { + s3: aws_sdk_s3::Client, +} + +impl LakeS3Client { + pub fn new(s3: aws_sdk_s3::Client) -> Self { + Self { s3 } + } + + pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { + let s3_client = aws_sdk_s3::Client::from_conf(config); + + Self { s3: s3_client } + } +} + +#[async_trait] +impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let object = self + .s3 + .get_object() + .bucket(bucket) + .key(prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .send() + .await?; + + let bytes = object.body.collect().await?.into_bytes().to_vec(); + + Ok(bytes) + } + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError> { + let response = self + .s3 + .list_objects_v2() + .max_keys(1000) // 1000 is the default and max value for this parameter + .delimiter("/".to_string()) + .start_after(start_after_prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .bucket(bucket) + .send() + .await?; + + let prefixes = match response.common_prefixes { + None => vec![], + Some(common_prefixes) => common_prefixes + .into_iter() + .filter_map(|common_prefix| common_prefix.prefix) + .collect::>() + .into_iter() + .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) + .collect(), + }; + + Ok(prefixes) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::sync::Arc; + + use crate::providers::s3::fetchers::fetch_streamer_message; + use async_trait::async_trait; + + #[derive(Clone, Debug)] + pub struct LakeS3Client {} + + #[async_trait] + impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + _bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); + tokio::fs::read(path) + .await + .map_err(|e| GetObjectBytesError(Arc::new(e))) + } + + async fn list_common_prefixes( + &self, + _bucket: &str, + _start_after: &str, + ) -> Result, ListCommonPrefixesError> { + Ok(Vec::new()) + } + } + + #[tokio::test] + async fn deserializes_meta_transactions() { + let lake_client = LakeS3Client {}; + let streamer_message = + fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) + .await + .unwrap(); + + let delegate_action = &streamer_message.shards[0] + .chunk + .as_ref() + .unwrap() + .transactions[0] + .transaction + .actions[0]; + + assert_eq!( + serde_json::to_value(delegate_action).unwrap(), + serde_json::json!({ + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + }) + ); + } +} diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index 4028188..0d360f9 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -1,80 +1,6 @@ use std::str::FromStr; -use async_trait::async_trait; - -use super::{ - client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}, - types, -}; - -#[derive(Clone, Debug)] -pub struct LakeS3Client { - s3: aws_sdk_s3::Client, -} - -impl LakeS3Client { - pub fn new(s3: aws_sdk_s3::Client) -> Self { - Self { s3 } - } - - pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { - let s3_client = aws_sdk_s3::Client::from_conf(config); - - Self { s3: s3_client } - } -} - -#[async_trait] -impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let object = self - .s3 - .get_object() - .bucket(bucket) - .key(prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .send() - .await?; - - let bytes = object.body.collect().await?.into_bytes().to_vec(); - - Ok(bytes) - } - - async fn list_common_prefixes( - &self, - bucket: &str, - start_after_prefix: &str, - ) -> Result, ListCommonPrefixesError> { - let response = self - .s3 - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(start_after_prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .bucket(bucket) - .send() - .await?; - - let prefixes = match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) - .collect(), - }; - - Ok(prefixes) - } -} +use super::{client::S3Client, types}; /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched @@ -103,8 +29,8 @@ pub async fn list_block_heights( /// By the given block height gets the objects: /// - block.json /// - shard_N.json -/// Reads the content of the objects and parses as a JSON. -/// Returns the result in `near_indexer_primitives::StreamerMessage` +/// Reads the content of the objects and parses as a JSON. +/// Returns the result in `near_indexer_primitives::StreamerMessage` pub async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, @@ -254,81 +180,3 @@ pub async fn fetch_shard_or_retry( } } } - -#[cfg(test)] -mod test { - use super::*; - - use std::sync::Arc; - - use async_trait::async_trait; - - #[derive(Clone, Debug)] - pub struct LakeS3Client {} - - #[async_trait] - impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - _bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - tokio::fs::read(path) - .await - .map_err(|e| GetObjectBytesError(Arc::new(e))) - } - - async fn list_common_prefixes( - &self, - _bucket: &str, - _start_after: &str, - ) -> Result, ListCommonPrefixesError> { - Ok(Vec::new()) - } - } - - #[tokio::test] - async fn deserializes_meta_transactions() { - let lake_client = LakeS3Client {}; - let streamer_message = - fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) - .await - .unwrap(); - - let delegate_action = &streamer_message.shards[0] - .chunk - .as_ref() - .unwrap() - .transactions[0] - .transaction - .actions[0]; - - assert_eq!( - serde_json::to_value(delegate_action).unwrap(), - serde_json::json!({ - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - }) - ); - } -} diff --git a/src/providers/s3/mod.rs b/src/providers/s3/mod.rs index aeeb2bc..f326d7c 100644 --- a/src/providers/s3/mod.rs +++ b/src/providers/s3/mod.rs @@ -106,14 +106,14 @@ pub(crate) async fn start( let lake_s3_client: Box = if let Some(s3_client) = config.s3_client { s3_client } else if let Some(config) = config.s3_config { - Box::new(fetchers::LakeS3Client::from_conf(config)) + Box::new(client::LakeS3Client::from_conf(config)) } else { let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) .region(aws_types::region::Region::new(config.s3_region_name)) .build(); - Box::new(fetchers::LakeS3Client::from_conf(s3_config)) + Box::new(client::LakeS3Client::from_conf(s3_config)) }; let mut last_processed_block_hash: Option = None;