From bb1ef294d20f264a2ff5beb6f1cb9aa850598347 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 25 Sep 2024 15:39:26 +0300 Subject: [PATCH 1/8] reffactoring and extend library to use different providers --- Cargo.toml | 4 +- src/lib.rs | 275 +----------------- src/providers/fastnear/client.rs | 1 + src/providers/fastnear/fetchers.rs | 1 + src/providers/fastnear/mod.rs | 3 + src/providers/fastnear/types.rs | 1 + src/providers/mod.rs | 2 + src/{s3_client.rs => providers/s3/client.rs} | 0 .../s3/fetchers.rs} | 30 +- src/providers/s3/mod.rs | 271 +++++++++++++++++ src/{ => providers/s3}/types.rs | 2 +- 11 files changed, 300 insertions(+), 290 deletions(-) create mode 100644 src/providers/fastnear/client.rs create mode 100644 src/providers/fastnear/fetchers.rs create mode 100644 src/providers/fastnear/mod.rs create mode 100644 src/providers/fastnear/types.rs create mode 100644 src/providers/mod.rs rename src/{s3_client.rs => providers/s3/client.rs} (100%) rename src/{s3_fetchers.rs => providers/s3/fetchers.rs} (91%) create mode 100644 src/providers/s3/mod.rs rename src/{ => providers/s3}/types.rs (98%) diff --git a/Cargo.toml b/Cargo.toml index 2ebd049..c641192 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ categories = ["asynchronous", "api-bindings", "network-programming"] keywords = ["near", "near-lake", "near-indexer"] authors = ["Near Inc "] edition = "2021" -rust-version = "1.75.0" +rust-version = "1.79.0" # cargo-workspaces [workspace.metadata.workspaces] @@ -32,7 +32,7 @@ tokio = { version = "1.35.1", features = ["sync", "time", "rt", "macros"] } tokio-stream = { version = "0.1.14" } tracing = "0.1.40" -near-indexer-primitives = "0.23.0" +near-indexer-primitives = "0.26.0" [lib] doctest = false diff --git a/src/lib.rs b/src/lib.rs index 11f711d..cac8c5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,18 +244,14 @@ #[macro_use] extern crate derive_builder; -use futures::stream::StreamExt; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; pub use near_indexer_primitives; pub use aws_credential_types::Credentials; -pub use types::{LakeConfig, LakeConfigBuilder}; +pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; -pub mod s3_client; -pub mod s3_fetchers; -pub(crate) mod types; +pub mod providers; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; @@ -285,270 +281,5 @@ pub fn streamer( mpsc::Receiver, ) { let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size); - (tokio::spawn(start(sender, config)), receiver) -} - -fn stream_block_heights<'a: 'b, 'b>( - lake_s3_client: &'a dyn s3_client::S3Client, - s3_bucket_name: &'a str, - mut start_from_block_height: crate::types::BlockHeight, -) -> impl futures::Stream + 'b { - async_stream::stream! { - loop { - tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); - match s3_fetchers::list_block_heights( - lake_s3_client, - s3_bucket_name, - start_from_block_height, - ) - .await { - Ok(block_heights) => { - if block_heights.is_empty() { - tracing::debug!( - target: LAKE_FRAMEWORK, - "There are no newer block heights than {} in bucket {}. Fetching again in 2s...", - start_from_block_height, - s3_bucket_name, - ); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received {} newer block heights", - block_heights.len() - ); - - start_from_block_height = *block_heights.last().unwrap() + 1; - for block_height in block_heights { - tracing::debug!(target: LAKE_FRAMEWORK, "Yielding {} block height...", block_height); - yield block_height; - } - } - Err(err) => { - tracing::warn!( - target: LAKE_FRAMEWORK, - "Failed to get block heights from bucket {}: {}. Retrying in 1s...", - s3_bucket_name, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - } - } -} - -// The only consumer of the BlockHeights Streamer -async fn prefetch_block_heights_into_pool( - pending_block_heights: &mut std::pin::Pin< - &mut impl tokio_stream::Stream, - >, - limit: usize, - await_for_at_least_one: bool, -) -> anyhow::Result> { - let mut block_heights = Vec::with_capacity(limit); - for remaining_limit in (0..limit).rev() { - tracing::debug!(target: LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit); - match futures::poll!(pending_block_heights.next()) { - std::task::Poll::Ready(Some(block_height)) => { - block_heights.push(block_height); - } - std::task::Poll::Pending => { - if await_for_at_least_one && block_heights.is_empty() { - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding..."); - match pending_block_heights.next().await { - Some(block_height) => { - block_heights.push(block_height); - } - None => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - continue; - } - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks."); - break; - } - std::task::Poll::Ready(None) => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - } - Ok(block_heights) -} - -#[allow(unused_labels)] // we use loop labels for code-readability -async fn start( - streamer_message_sink: mpsc::Sender, - config: LakeConfig, -) -> anyhow::Result<()> { - let mut start_from_block_height = config.start_block_height; - - let lake_s3_client: Box = - if let Some(s3_client) = config.s3_client { - s3_client - } else if let Some(config) = config.s3_config { - Box::new(s3_fetchers::LakeS3Client::from_conf(config)) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - - Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config)) - }; - - let mut last_processed_block_hash: Option = None; - - 'main: loop { - // In the beginning of the 'main' loop we create a Block Heights stream - // and prefetch the initial data in that pool. - // Later the 'stream' loop might exit to this 'main' one to repeat the procedure. - // This happens because we assume Lake Indexer that writes to the S3 Bucket might - // in some cases, write N+1 block before it finishes writing the N block. - // We require to stream blocks consistently, so we need to try to load the block again. - - let pending_block_heights = stream_block_heights( - &*lake_s3_client, - &config.s3_bucket_name, - start_from_block_height, - ); - tokio::pin!(pending_block_heights); - - let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks...", - config.blocks_preload_pool_size - ); - - streamer_messages_futures.extend( - prefetch_block_heights_into_pool( - &mut pending_block_heights, - config.blocks_preload_pool_size, - true, - ) - .await? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &*lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - }), - ); - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Awaiting for the first prefetched block..." - ); - 'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await { - let streamer_message = streamer_message_result.map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to fetch StreamerMessage with error: \n{:#?}", - err, - ); - err - })?; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - // check if we have `last_processed_block_hash` (might be None only on start) - if let Some(prev_block_hash) = last_processed_block_hash { - // compare last_processed_block_hash` with `block.header.prev_hash` of the current - // block (ensure we don't miss anything from S3) - // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step - if prev_block_hash != streamer_message.block.header.prev_hash { - tracing::warn!( - target: LAKE_FRAMEWORK, - "`prev_hash` does not match, refetching the data from S3 in 200ms", - ); - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - break 'stream; - } - } - - // store current block info as `last_processed_block_*` for next iteration - last_processed_block_hash = Some(streamer_message.block.header.hash); - start_from_block_height = streamer_message.block.header.height + 1; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)", - config.blocks_preload_pool_size, - streamer_messages_futures.len(), - ); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Streaming block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - let blocks_preload_pool_current_len = streamer_messages_futures.len(); - - let prefetched_block_heights_future = prefetch_block_heights_into_pool( - &mut pending_block_heights, - config - .blocks_preload_pool_size - .saturating_sub(blocks_preload_pool_current_len), - blocks_preload_pool_current_len == 0, - ); - - let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message); - - let (prefetch_res, send_res): ( - Result, anyhow::Error>, - Result<_, SendError>, - ) = futures::join!( - prefetched_block_heights_future, - streamer_message_sink_send_future, - ); - - if let Err(SendError(err)) = send_res { - tracing::debug!( - target: LAKE_FRAMEWORK, - "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}", - start_from_block_height - 1, - err, - ); - return Ok(()); - } - - streamer_messages_futures.extend( - prefetch_res - .map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}", - err - ); - err - })? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &*lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - } - )); - } - - tracing::warn!( - target: LAKE_FRAMEWORK, - "Exited from the 'stream' loop. It may happen in two cases:\n - 1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n - 2. Received a Block which prev_hash doesn't match the previously streamed block.\n - Will attempt to restart the stream from block #{}", - start_from_block_height, - ); - } + (tokio::spawn(providers::s3::start(sender, config)), receiver) } diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/providers/fastnear/client.rs @@ -0,0 +1 @@ + diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/providers/fastnear/fetchers.rs @@ -0,0 +1 @@ + diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs new file mode 100644 index 0000000..18b7f21 --- /dev/null +++ b/src/providers/fastnear/mod.rs @@ -0,0 +1,3 @@ +pub mod client; +pub mod fetchers; +pub mod types; diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/providers/fastnear/types.rs @@ -0,0 +1 @@ + diff --git a/src/providers/mod.rs b/src/providers/mod.rs new file mode 100644 index 0000000..fc5b561 --- /dev/null +++ b/src/providers/mod.rs @@ -0,0 +1,2 @@ +pub mod fastnear; +pub mod s3; diff --git a/src/s3_client.rs b/src/providers/s3/client.rs similarity index 100% rename from src/s3_client.rs rename to src/providers/s3/client.rs diff --git a/src/s3_fetchers.rs b/src/providers/s3/fetchers.rs similarity index 91% rename from src/s3_fetchers.rs rename to src/providers/s3/fetchers.rs index a29bb1e..d245cef 100644 --- a/src/s3_fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use async_trait::async_trait; -use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; +use super::{types, client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}}; #[derive(Clone, Debug)] pub struct LakeS3Client { @@ -78,8 +78,8 @@ impl S3Client for LakeS3Client { pub async fn list_block_heights( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - start_from_block_height: crate::types::BlockHeight, -) -> Result, crate::types::LakeError> { + start_from_block_height: types::BlockHeight, +) -> Result, types::LakeError> { tracing::debug!( target: crate::LAKE_FRAMEWORK, "Fetching block heights from S3, after #{}...", @@ -105,8 +105,8 @@ pub async fn list_block_heights( pub(crate) async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { let block_view = fetch_block_or_retry(lake_s3_client, s3_bucket_name, block_height).await?; let fetch_shards_futures = (0..block_view.chunks.len() as u64) @@ -128,8 +128,8 @@ pub(crate) async fn fetch_streamer_message( pub async fn fetch_block( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { let bytes = lake_s3_client .get_object_bytes(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) .await?; @@ -143,13 +143,13 @@ pub async fn fetch_block( pub async fn fetch_block_or_retry( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { loop { match fetch_block(lake_s3_client, s3_bucket_name, block_height).await { Ok(block_view) => break Ok(block_view), Err(err) => { - if let crate::types::LakeError::S3GetError { ref error } = err { + if let types::LakeError::S3GetError { ref error } = err { if let Some(get_object_error) = error.downcast_ref::() { @@ -188,9 +188,9 @@ pub async fn fetch_block_or_retry( pub async fn fetch_shard( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, + block_height: types::BlockHeight, shard_id: u64, -) -> Result { +) -> Result { let bytes = lake_s3_client .get_object_bytes( s3_bucket_name, @@ -207,14 +207,14 @@ pub async fn fetch_shard( pub async fn fetch_shard_or_retry( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, + block_height: types::BlockHeight, shard_id: u64, -) -> Result { +) -> Result { loop { match fetch_shard(lake_s3_client, s3_bucket_name, block_height, shard_id).await { Ok(shard) => break Ok(shard), Err(err) => { - if let crate::types::LakeError::S3ListError { ref error } = err { + if let types::LakeError::S3ListError { ref error } = err { if let Some(list_objects_error) = error.downcast_ref::() { diff --git a/src/providers/s3/mod.rs b/src/providers/s3/mod.rs new file mode 100644 index 0000000..aeeb2bc --- /dev/null +++ b/src/providers/s3/mod.rs @@ -0,0 +1,271 @@ +use futures::stream::StreamExt; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; + +pub mod client; +pub mod fetchers; +pub mod types; + +fn stream_block_heights<'a: 'b, 'b>( + lake_s3_client: &'a dyn client::S3Client, + s3_bucket_name: &'a str, + mut start_from_block_height: types::BlockHeight, +) -> impl futures::Stream + 'b { + async_stream::stream! { + loop { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); + match fetchers::list_block_heights( + lake_s3_client, + s3_bucket_name, + start_from_block_height, + ) + .await { + Ok(block_heights) => { + if block_heights.is_empty() { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "There are no newer block heights than {} in bucket {}. Fetching again in 2s...", + start_from_block_height, + s3_bucket_name, + ); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Received {} newer block heights", + block_heights.len() + ); + + start_from_block_height = *block_heights.last().unwrap() + 1; + for block_height in block_heights { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Yielding {} block height...", block_height); + yield block_height; + } + } + Err(err) => { + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "Failed to get block heights from bucket {}: {}. Retrying in 1s...", + s3_bucket_name, + err, + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } +} + +// The only consumer of the BlockHeights Streamer +async fn prefetch_block_heights_into_pool( + pending_block_heights: &mut std::pin::Pin< + &mut impl tokio_stream::Stream, + >, + limit: usize, + await_for_at_least_one: bool, +) -> anyhow::Result> { + let mut block_heights = Vec::with_capacity(limit); + for remaining_limit in (0..limit).rev() { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit); + match futures::poll!(pending_block_heights.next()) { + std::task::Poll::Ready(Some(block_height)) => { + block_heights.push(block_height); + } + std::task::Poll::Pending => { + if await_for_at_least_one && block_heights.is_empty() { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding..."); + match pending_block_heights.next().await { + Some(block_height) => { + block_heights.push(block_height); + } + None => { + return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); + } + } + continue; + } + tracing::debug!(target: crate::LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks."); + break; + } + std::task::Poll::Ready(None) => { + return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); + } + } + } + Ok(block_heights) +} + +#[allow(unused_labels)] // we use loop labels for code-readability +pub(crate) async fn start( + streamer_message_sink: mpsc::Sender, + config: types::LakeConfig, +) -> anyhow::Result<()> { + let mut start_from_block_height = config.start_block_height; + + let lake_s3_client: Box = if let Some(s3_client) = config.s3_client { + s3_client + } else if let Some(config) = config.s3_config { + Box::new(fetchers::LakeS3Client::from_conf(config)) + } else { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .region(aws_types::region::Region::new(config.s3_region_name)) + .build(); + + Box::new(fetchers::LakeS3Client::from_conf(s3_config)) + }; + + let mut last_processed_block_hash: Option = None; + + 'main: loop { + // In the beginning of the 'main' loop we create a Block Heights stream + // and prefetch the initial data in that pool. + // Later the 'stream' loop might exit to this 'main' one to repeat the procedure. + // This happens because we assume Lake Indexer that writes to the S3 Bucket might + // in some cases, write N+1 block before it finishes writing the N block. + // We require to stream blocks consistently, so we need to try to load the block again. + + let pending_block_heights = stream_block_heights( + &*lake_s3_client, + &config.s3_bucket_name, + start_from_block_height, + ); + tokio::pin!(pending_block_heights); + + let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Prefetching up to {} blocks...", + config.blocks_preload_pool_size + ); + + streamer_messages_futures.extend( + prefetch_block_heights_into_pool( + &mut pending_block_heights, + config.blocks_preload_pool_size, + true, + ) + .await? + .into_iter() + .map(|block_height| { + fetchers::fetch_streamer_message( + &*lake_s3_client, + &config.s3_bucket_name, + block_height, + ) + }), + ); + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Awaiting for the first prefetched block..." + ); + 'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await { + let streamer_message = streamer_message_result.map_err(|err| { + tracing::error!( + target: crate::LAKE_FRAMEWORK, + "Failed to fetch StreamerMessage with error: \n{:#?}", + err, + ); + err + })?; + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Received block #{} ({})", + streamer_message.block.header.height, + streamer_message.block.header.hash + ); + // check if we have `last_processed_block_hash` (might be None only on start) + if let Some(prev_block_hash) = last_processed_block_hash { + // compare last_processed_block_hash` with `block.header.prev_hash` of the current + // block (ensure we don't miss anything from S3) + // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step + if prev_block_hash != streamer_message.block.header.prev_hash { + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "`prev_hash` does not match, refetching the data from S3 in 200ms", + ); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + break 'stream; + } + } + + // store current block info as `last_processed_block_*` for next iteration + last_processed_block_hash = Some(streamer_message.block.header.hash); + start_from_block_height = streamer_message.block.header.height + 1; + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)", + config.blocks_preload_pool_size, + streamer_messages_futures.len(), + ); + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Streaming block #{} ({})", + streamer_message.block.header.height, + streamer_message.block.header.hash + ); + let blocks_preload_pool_current_len = streamer_messages_futures.len(); + + let prefetched_block_heights_future = prefetch_block_heights_into_pool( + &mut pending_block_heights, + config + .blocks_preload_pool_size + .saturating_sub(blocks_preload_pool_current_len), + blocks_preload_pool_current_len == 0, + ); + + let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message); + + let (prefetch_res, send_res): ( + Result, anyhow::Error>, + Result<_, SendError>, + ) = futures::join!( + prefetched_block_heights_future, + streamer_message_sink_send_future, + ); + + if let Err(SendError(err)) = send_res { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}", + start_from_block_height - 1, + err, + ); + return Ok(()); + } + + streamer_messages_futures.extend( + prefetch_res + .map_err(|err| { + tracing::error!( + target: crate::LAKE_FRAMEWORK, + "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}", + err + ); + err + })? + .into_iter() + .map(|block_height| { + fetchers::fetch_streamer_message( + &*lake_s3_client, + &config.s3_bucket_name, + block_height, + ) + } + )); + } + + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "Exited from the 'stream' loop. It may happen in two cases:\n + 1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n + 2. Received a Block which prev_hash doesn't match the previously streamed block.\n + Will attempt to restart the stream from block #{}", + start_from_block_height, + ); + } +} diff --git a/src/types.rs b/src/providers/s3/types.rs similarity index 98% rename from src/types.rs rename to src/providers/s3/types.rs index 5a0e5b3..da76a94 100644 --- a/src/types.rs +++ b/src/providers/s3/types.rs @@ -1,4 +1,4 @@ -use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; +use super::client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; /// Type alias represents the block height pub type BlockHeight = u64; From b8753cddd0cd82aebc59f0db7935481f2dd12874 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 25 Sep 2024 17:23:58 +0300 Subject: [PATCH 2/8] add fastnear provider --- Cargo.toml | 1 + src/lib.rs | 19 +++-- src/providers/fastnear/client.rs | 47 +++++++++++ src/providers/fastnear/fetchers.rs | 124 ++++++++++++++++++++++++++++ src/providers/fastnear/mod.rs | 72 +++++++++++++++++ src/providers/fastnear/types.rs | 126 +++++++++++++++++++++++++++++ src/providers/mod.rs | 26 ++++++ src/providers/s3/fetchers.rs | 5 +- 8 files changed, 413 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c641192..2e04dee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ async-stream = "0.3.5" async-trait = "0.1.77" derive_builder = "0.13.0" futures = "0.3.30" +reqwest = { version = "0.12.7", features = ["json"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" thiserror = "1.0.56" diff --git a/src/lib.rs b/src/lib.rs index cac8c5d..683da79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,11 +244,10 @@ #[macro_use] extern crate derive_builder; -use tokio::sync::mpsc; - pub use near_indexer_primitives; pub use aws_credential_types::Credentials; +pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; pub mod providers; @@ -275,11 +274,19 @@ pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; /// # } /// ``` pub fn streamer( - config: LakeConfig, + config: providers::NearLakeFrameworkConfig, ) -> ( tokio::task::JoinHandle>, - mpsc::Receiver, + tokio::sync::mpsc::Receiver, ) { - let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size); - (tokio::spawn(providers::s3::start(sender, config)), receiver) + let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); + match config { + providers::NearLakeFrameworkConfig::Lake(config) => { + (tokio::spawn(providers::s3::start(sender, config)), receiver) + } + providers::NearLakeFrameworkConfig::FastNear(config) => ( + tokio::spawn(providers::fastnear::start(sender, config)), + receiver, + ), + } } diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs index 8b13789..80ebb49 100644 --- a/src/providers/fastnear/client.rs +++ b/src/providers/fastnear/client.rs @@ -1 +1,48 @@ +use super::types; +#[derive(Clone, Debug)] +pub struct FastNearClient { + client: reqwest::Client, + endpoint: String, +} + +impl FastNearClient { + pub fn new(config: &types::FastNearConfig) -> Self { + Self { + endpoint: config.endpoint.clone(), + client: reqwest::Client::new(), + } + } + + pub async fn fetch( + &self, + url_path: &str, + ) -> Result, types::FastNearError> { + let url = format!("{}{}", self.endpoint, url_path); + let response = self.client.get(&url).send().await?; + match response.status().as_u16() { + 200 => Ok(response.json().await?), + 404 => Err(response.json::().await?.into()), + _ => Err(types::FastNearError::UnknownError(format!( + "Unexpected status code: {}, Response: {}", + response.status(), + response.text().await? + ))), + } + } + + pub async fn fetch_until_success( + &self, + url_path: &str, + ) -> Option { + loop { + match self.fetch(url_path).await { + Ok(block) => return block, + Err(err) => { + tracing::warn!(target: crate::LAKE_FRAMEWORK, "Failed to fetch block: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } +} diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs index 8b13789..7284a78 100644 --- a/src/providers/fastnear/fetchers.rs +++ b/src/providers/fastnear/fetchers.rs @@ -1 +1,125 @@ +use super::client::FastNearClient; +use super::types; +pub async fn fetch_last_block(client: &FastNearClient) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/last_block/final") + .await + .expect("Failed to fetch last block") +} + +pub async fn fetch_first_block( + client: &FastNearClient, +) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/first_block") + .await + .expect("Failed to fetch first block") +} + +pub async fn fetch_streamer_message( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Option { + client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await +} + +pub async fn fetch_block( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Result { + let streamer_message = client.fetch(&format!("/v0/block/{}", block_height)).await?; + if let Some(msg) = streamer_message { + Ok(msg.block) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +pub async fn fetch_block_or_retry( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Result { + let streamer_message = client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await; + if let Some(msg) = streamer_message { + Ok(msg.block) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +pub async fn fetch_shard( + client: &FastNearClient, + block_height: types::BlockHeight, + shard_id: u64, +) -> Result { + let streamer_message = client.fetch(&format!("/v0/block/{}", block_height)).await?; + if let Some(msg) = streamer_message { + Ok(msg + .shards + .iter() + .filter_map(|shard| { + if shard.shard_id == shard_id { + Some(shard.clone()) + } else { + None + } + }) + .next() + .ok_or_else(|| { + types::FastNearError::BlockDoesNotExist(format!( + "Block {} and shard {} does not exist", + block_height, shard_id + )) + })?) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +pub async fn fetch_shard_or_retry( + client: &FastNearClient, + block_height: types::BlockHeight, + shard_id: u64, +) -> Result { + let streamer_message = client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await; + if let Some(msg) = streamer_message { + Ok(msg + .shards + .iter() + .filter_map(|shard| { + if shard.shard_id == shard_id { + Some(shard.clone()) + } else { + None + } + }) + .next() + .ok_or_else(|| { + types::FastNearError::BlockDoesNotExist(format!( + "Block {} and shard {} does not exist", + block_height, shard_id + )) + })?) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 18b7f21..224ec7a 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -1,3 +1,75 @@ pub mod client; pub mod fetchers; pub mod types; + +#[allow(unused_labels)] // we use loop labels for code-readability +pub async fn start( + blocks_sink: tokio::sync::mpsc::Sender, + config: types::FastNearConfig, +) -> anyhow::Result<()> { + let client = client::FastNearClient::new(&config); + let max_num_threads = config.num_threads; + let next_sink_block = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); + 'main: loop { + // In the beginning of the 'main' loop, we fetch the next block height to start fetching from + let start_block_height = next_sink_block.load(std::sync::atomic::Ordering::SeqCst); + let next_fetch_block = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(start_block_height)); + let last_block_height = fetchers::fetch_last_block(&client) + .await + .block + .header + .height; + let is_backfill = last_block_height > start_block_height + max_num_threads; + let num_threads = if is_backfill { max_num_threads } else { 1 }; + tracing::info!( + target: crate::LAKE_FRAMEWORK, + "Start fetching from block {} to block {} with {} threads. Backfill: {:?}", + start_block_height, + last_block_height, + num_threads, + is_backfill + ); + // starting backfill with multiple threads + let handles = (0..num_threads) + .map(|thread_index| { + let client = client.clone(); + let blocks_sink = blocks_sink.clone(); + let next_fetch_block = next_fetch_block.clone(); + let next_sink_block = next_sink_block.clone(); + tokio::spawn(async move { + 'stream: loop { + let block_height = next_fetch_block.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if is_backfill && block_height > last_block_height { + break 'stream; + } + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Fetching block: {}", thread_index, block_height); + let block = + fetchers::fetch_streamer_message(&client, block_height).await; + 'sender: loop { + let expected_block_height = next_sink_block.load(std::sync::atomic::Ordering::SeqCst); + if expected_block_height < block_height { + tokio::time::sleep(std::time::Duration::from_millis( + block_height - expected_block_height, + )).await; + } else { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Sending block: {}", thread_index, block_height); + break 'sender; + } + } + if let Some(block) = block { + blocks_sink.send(block).await.expect("Failed to send block"); + } else { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Skipped block: {}", thread_index, block_height); + } + next_sink_block.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + }) + }) + .collect::>(); + for handle in handles { + handle.await?; + } + } +} diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index 8b13789..a713e47 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -1 +1,127 @@ +/// Type alias represents the block height +pub type BlockHeight = u64; +/// Configuration struct for Fast NEAR Data Framework +/// NB! Consider using [`FastNearConfigBuilder`] +/// Building the `FastNearConfig` example: +/// ``` +/// use fastnear_data_framework::FastNearConfigBuilder; +/// +/// # async fn main() { +/// let config = FastNearConfigBuilder::default() +/// .testnet() +/// .start_block_height(82422587) +/// .build() +/// .expect("Failed to build FastNearConfig"); +/// # } +/// ``` +#[derive(Default, Builder)] +#[builder(pattern = "owned")] +pub struct FastNearConfig { + /// Fastnear data endpoint + #[builder(setter(into))] + pub(crate) endpoint: String, + /// Defines the block height to start indexing from + pub(crate) start_block_height: u64, + /// Number of threads to use for fetching data + /// Default: 2 * available threads + #[builder(default = "num_threads_default()")] + pub(crate) num_threads: u64, + #[builder(default = "100")] + pub(crate) blocks_preload_pool_size: usize, +} + +impl FastNearConfigBuilder { + /// Shortcut to set up [FastNearConfigBuilder] for mainnet + /// ``` + /// use fastnear_data_framework::FastNearConfigBuilder; + /// + /// # async fn main() { + /// let config = FastNearConfigBuilder::default() + /// .mainnet() + /// .start_block_height(65231161) + /// .build() + /// .expect("Failed to build FastNearConfig"); + /// # } + /// ``` + pub fn mainnet(mut self) -> Self { + self.endpoint = Some("https://mainnet.neardata.xyz".to_string()); + self + } + + /// Shortcut to set up [FastNearConfigBuilder] for testnet + /// ``` + /// use fastnear_data_framework::FastNearConfigBuilder; + /// + /// # async fn main() { + /// let config = FastNearConfigBuilder::default() + /// .testnet() + /// .start_block_height(82422587) + /// .build() + /// .expect("Failed to build FastNearConfig"); + /// # } + /// ``` + pub fn testnet(mut self) -> Self { + self.endpoint = Some("https://testnet.neardata.xyz".to_string()); + self + } +} + +/// Shortcut to set up [FastNearConfigBuilder] num_threads +/// ``` +/// use fastnear_data_framework::FastNearConfigBuilder; +/// +/// # async fn main() { +/// let config = FastNearConfigBuilder::default() +/// .mainnet() +/// .num_threads(8) +/// .start_block_height(82422587) +/// .build() +/// .expect("Failed to build FastNearConfig"); +/// # } +/// ``` +fn num_threads_default() -> u64 { + // Default to 2 threads if we can't get the number of available threads + let threads = + std::thread::available_parallelism().map_or(2, std::num::NonZeroUsize::get) as u64; + // Double the number of threads to fetch data and process it concurrently in the streamer + threads * 2 +} + +#[derive(Debug, thiserror::Error)] +pub enum FastNearError { + #[error("Block height too high: {0}")] + BlockHeightTooHigh(String), + #[error("Block height too low: {0}")] + BlockHeightTooLow(String), + #[error("Block does not exist: {0}")] + BlockDoesNotExist(String), + #[error("Request error: {0}")] + RequestError(reqwest::Error), + #[error("An unknown error occurred: {0}")] + UnknownError(String), +} + +impl From for FastNearError { + fn from(error: reqwest::Error) -> Self { + FastNearError::RequestError(error) + } +} + +#[derive(Debug, serde::Deserialize)] +pub(crate) struct ErrorResponse { + error: String, + #[serde(rename = "type")] + error_type: String, +} + +impl From for FastNearError { + fn from(response: ErrorResponse) -> Self { + match response.error_type.as_str() { + "BLOCK_DOES_NOT_EXIST" => FastNearError::BlockDoesNotExist(response.error), + "BLOCK_HEIGHT_TOO_HIGH" => FastNearError::BlockHeightTooHigh(response.error), + "BLOCK_HEIGHT_TOO_LOW" => FastNearError::BlockHeightTooLow(response.error), + _ => FastNearError::UnknownError(response.error), + } + } +} diff --git a/src/providers/mod.rs b/src/providers/mod.rs index fc5b561..056ae8c 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -1,2 +1,28 @@ pub mod fastnear; pub mod s3; + +pub enum NearLakeFrameworkConfig { + Lake(s3::types::LakeConfig), + FastNear(fastnear::types::FastNearConfig), +} + +impl NearLakeFrameworkConfig { + pub fn blocks_preload_pool_size(&self) -> usize { + match self { + NearLakeFrameworkConfig::Lake(config) => config.blocks_preload_pool_size, + NearLakeFrameworkConfig::FastNear(config) => config.blocks_preload_pool_size, + } + } +} + +impl From for NearLakeFrameworkConfig { + fn from(config: s3::types::LakeConfig) -> Self { + NearLakeFrameworkConfig::Lake(config) + } +} + +impl From for NearLakeFrameworkConfig { + fn from(config: fastnear::types::FastNearConfig) -> Self { + NearLakeFrameworkConfig::FastNear(config) + } +} diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index d245cef..8f82bfe 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -2,7 +2,10 @@ use std::str::FromStr; use async_trait::async_trait; -use super::{types, client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}}; +use super::{ + client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}, + types, +}; #[derive(Clone, Debug)] pub struct LakeS3Client { From f82ab8c14990c3f12c2ccfbc6b9bd7549e5f3c7e Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 26 Sep 2024 13:36:21 +0300 Subject: [PATCH 3/8] fix tests --- Cargo.toml | 5 ++++- README.md | 2 +- src/lib.rs | 4 ++-- src/providers/fastnear/types.rs | 8 ++++---- src/providers/s3/fetchers.rs | 1 - 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2e04dee..e214c90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,10 @@ tokio = { version = "1.35.1", features = ["sync", "time", "rt", "macros"] } tokio-stream = { version = "0.1.14" } tracing = "0.1.40" -near-indexer-primitives = "0.26.0" +# Bug with deserialization of the transactions it should be fixed in the next release 0.27.0 +# near-indexer-primitives = "0.26.0" +# for now we use the forked version +near-indexer-primitives = { git = 'https://github.com/kobayurii/nearcore.git', branch = "2.2.1-fork1" } [lib] doctest = false diff --git a/README.md b/README.md index 6ef9dde..bd9a03e 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ async fn main() -> Result<(), tokio::io::Error> { .expect("Failed to build LakeConfig"); // instantiate the NEAR Lake Framework Stream - let (sender, stream) = near_lake_framework::streamer(config); + let (sender, stream) = near_lake_framework::streamer(config.into()); // read the stream events and pass them to a handler function with // concurrency 1 diff --git a/src/lib.rs b/src/lib.rs index 683da79..5fa1752 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ //! .expect("Failed to build LakeConfig"); //! //! // instantiate the NEAR Lake Framework Stream -//! let (sender, stream) = near_lake_framework::streamer(config); +//! let (sender, stream) = near_lake_framework::streamer(config.into()); //! //! // read the stream events and pass them to a handler function with //! // concurrency 1 @@ -266,7 +266,7 @@ pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; /// .build() /// .expect("Failed to build LakeConfig"); /// -/// let (_, stream) = near_lake_framework::streamer(config); +/// let (_, stream) = near_lake_framework::streamer(config.into()); /// /// while let Some(streamer_message) = stream.recv().await { /// eprintln!("{:#?}", streamer_message); diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index a713e47..eafc7ab 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -5,7 +5,7 @@ pub type BlockHeight = u64; /// NB! Consider using [`FastNearConfigBuilder`] /// Building the `FastNearConfig` example: /// ``` -/// use fastnear_data_framework::FastNearConfigBuilder; +/// use near_lake_framework::FastNearConfigBuilder; /// /// # async fn main() { /// let config = FastNearConfigBuilder::default() @@ -34,7 +34,7 @@ pub struct FastNearConfig { impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] for mainnet /// ``` - /// use fastnear_data_framework::FastNearConfigBuilder; + /// use near_lake_framework::FastNearConfigBuilder; /// /// # async fn main() { /// let config = FastNearConfigBuilder::default() @@ -51,7 +51,7 @@ impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] for testnet /// ``` - /// use fastnear_data_framework::FastNearConfigBuilder; + /// use near_lake_framework::FastNearConfigBuilder; /// /// # async fn main() { /// let config = FastNearConfigBuilder::default() @@ -69,7 +69,7 @@ impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] num_threads /// ``` -/// use fastnear_data_framework::FastNearConfigBuilder; +/// use near_lake_framework::FastNearConfigBuilder; /// /// # async fn main() { /// let config = FastNearConfigBuilder::default() diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index 8f82bfe..76e5ba5 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -291,7 +291,6 @@ mod test { #[tokio::test] async fn deserializes_meta_transactions() { let lake_client = LakeS3Client {}; - let streamer_message = fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) .await From 9ae9e69779e1f8f84b00dbeb85d19bd6bd5ce014 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Tue, 1 Oct 2024 14:20:38 +0300 Subject: [PATCH 4/8] add docstrings and documentation --- CHANGELOG.md | 20 +++++++++++++- README.md | 37 +++++++++++++++++++++++++- src/lib.rs | 42 +++++++++++++++++++++++++++--- src/providers/fastnear/client.rs | 10 +++++++ src/providers/fastnear/fetchers.rs | 30 +++++++++++++++++++++ src/providers/fastnear/mod.rs | 5 ++++ src/providers/s3/fetchers.rs | 2 +- 7 files changed, 139 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13f16aa..ba317ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.9...HEAD) +## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.10...HEAD) + +## [0.7.10](https://github.com/near/near-lake-framework/compare/v0.7.9...0.7.10) + +* Upgrade `near-indexer-primitives` to `0.27.0` (nearcore-2.3.0) +* Added new provider `fastnear` - a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. Check the [FastNear](https://fastnear.com/) provider for more details. + +### Breaking Change + +* `s3_fetchers` rename to `fetchers` and move to the `providers` module. New usage example: + ```rust + use near_lake_framework::providers::s3::fetchers::fetch_streamer_message; + use near_lake_framework::providers::fastnear::fetchers::fetch_streamer_message; + ``` +* `s3_client` rename to `client` and move to the `providers` module. New usage example: + ```rust + use near_lake_framework::providers::s3::client::S3Client; + use near_lake_framework::providers::fastnear::client::FastNearClient; + ``` ## [0.7.9](https://github.com/near/near-lake-framework/compare/v0.7.7...0.7.9) diff --git a/README.md b/README.md index bd9a03e..45a35b6 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ async fn main() -> Result<(), tokio::io::Error> { .expect("Failed to build LakeConfig"); // instantiate the NEAR Lake Framework Stream - let (sender, stream) = near_lake_framework::streamer(config.into()); + let (sender, stream) = near_lake_framework::streamer(config); // read the stream events and pass them to a handler function with // concurrency 1 @@ -148,6 +148,41 @@ $5.7 + $14.1 = $19.8 The price depends on the number of shards +## FastNear provider + +FastNear provides a service to access the NEAR Protocol data + +- [FastNear](https://fastnear.com/) + +FastNear provider is a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. + +### How to use it: + +```rust +use futures::StreamExt; +use near_lake_framework::FastNearConfigBuilder; + +#[tokio::main] +async fn main() { + + let config = FastNearConfigBuilder::default() + .testnet() + .start_block_height(82422587) + .build() + .expect("Failed to build LakeConfig"); + + let (_, stream) = near_lake_framework::streamer(config); + + while let Some(streamer_message) = stream.recv().await { + eprintln!("{:#?}", streamer_message); + } +} +``` +### How to migrate from Lake to FastNear: + +- Replace `LakeConfigBuilder` with `FastNearConfigBuilder` +- Replace `LakeConfig` with `FastNearConfig` + ## Future plans We use Milestones with clearly defined acceptance criteria: diff --git a/src/lib.rs b/src/lib.rs index 5fa1752..885e2ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ //! .expect("Failed to build LakeConfig"); //! //! // instantiate the NEAR Lake Framework Stream -//! let (sender, stream) = near_lake_framework::streamer(config.into()); +//! let (sender, stream) = near_lake_framework::streamer(config); //! //! // read the stream events and pass them to a handler function with //! // concurrency 1 @@ -233,6 +233,39 @@ //! //! The price depends on the number of shards //! +//! ### FastNear provider +//! +//! FastNear provider is a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. +//! +//! How to use it: +//! +//! ## Example +//! +//! ```rust +//! use futures::StreamExt; +//! use near_lake_framework::FastNearConfigBuilder; +//! +//! #[tokio::main] +//! # async fn main() { +//! let config = FastNearConfigBuilder::default() +//! .testnet() +//! .start_block_height(82422587) +//! .build() +//! .expect("Failed to build LakeConfig"); +//! +//! let (_, stream) = near_lake_framework::streamer(config); +//! +//! while let Some(streamer_message) = stream.recv().await { +//! eprintln!("{:#?}", streamer_message); +//! } +//! # } +//! ``` +//! +//! How to migrate from Lake to FastNear: +//! +//! - Replace `LakeConfigBuilder` with `FastNearConfigBuilder` +//! - Replace `LakeConfig` with `FastNearConfig` +//! //! ## Future plans //! //! We use Milestones with clearly defined acceptance criteria: @@ -266,19 +299,20 @@ pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; /// .build() /// .expect("Failed to build LakeConfig"); /// -/// let (_, stream) = near_lake_framework::streamer(config.into()); +/// let (_, stream) = near_lake_framework::streamer(config); /// /// while let Some(streamer_message) = stream.recv().await { /// eprintln!("{:#?}", streamer_message); /// } /// # } /// ``` -pub fn streamer( - config: providers::NearLakeFrameworkConfig, +pub fn streamer>( + config: T, ) -> ( tokio::task::JoinHandle>, tokio::sync::mpsc::Receiver, ) { + let config: providers::NearLakeFrameworkConfig = config.into(); let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); match config { providers::NearLakeFrameworkConfig::Lake(config) => { diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs index 80ebb49..103884a 100644 --- a/src/providers/fastnear/client.rs +++ b/src/providers/fastnear/client.rs @@ -1,5 +1,7 @@ use super::types; +/// FastNearClient is a client to interact with the FastNear API +/// It is used to fetch the blocks from the FastNear #[derive(Clone, Debug)] pub struct FastNearClient { client: reqwest::Client, @@ -14,6 +16,10 @@ impl FastNearClient { } } + /// Fetches the block from the FastNear API + /// Returns the result in `Option` + /// If the block does not exist, returns `None` + /// If the request fails, returns an error pub async fn fetch( &self, url_path: &str, @@ -31,6 +37,10 @@ impl FastNearClient { } } + /// Fetches the block from the FastNear API until it succeeds + /// It retries fetching the block until it gets a successful response + /// Returns the result in `Option` + /// If the block does not exist, returns `None` pub async fn fetch_until_success( &self, url_path: &str, diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs index 7284a78..25fb813 100644 --- a/src/providers/fastnear/fetchers.rs +++ b/src/providers/fastnear/fetchers.rs @@ -1,6 +1,8 @@ use super::client::FastNearClient; use super::types; +/// Fetches the last block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` pub async fn fetch_last_block(client: &FastNearClient) -> near_indexer_primitives::StreamerMessage { client .fetch_until_success("/v0/last_block/final") @@ -8,6 +10,19 @@ pub async fn fetch_last_block(client: &FastNearClient) -> near_indexer_primitive .expect("Failed to fetch last block") } +/// Fetches the optimistic block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` +pub async fn fetch_optimistic_block( + client: &FastNearClient, +) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/last_block/optimistic") + .await + .expect("Failed to fetch optimistic block") +} + +/// Fetches the genesis block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` pub async fn fetch_first_block( client: &FastNearClient, ) -> near_indexer_primitives::StreamerMessage { @@ -17,6 +32,9 @@ pub async fn fetch_first_block( .expect("Failed to fetch first block") } +/// Fetches the block data from the fastenar by block height +/// Returns the result in `Option` +/// If the block does not exist, returns `None` pub async fn fetch_streamer_message( client: &FastNearClient, block_height: types::BlockHeight, @@ -26,6 +44,9 @@ pub async fn fetch_streamer_message( .await } +/// Fetches the block from the fastenar by block height +/// Returns the result in `near_indexer_primitives::views::BlockView` +/// If the block does not exist, returns an error pub async fn fetch_block( client: &FastNearClient, block_height: types::BlockHeight, @@ -41,6 +62,9 @@ pub async fn fetch_block( } } +/// Fetches the block from the fastenar by block height +/// Returns the result in `near_indexer_primitives::views::BlockView` +/// If the block does not exist, retries fetching the block pub async fn fetch_block_or_retry( client: &FastNearClient, block_height: types::BlockHeight, @@ -58,6 +82,9 @@ pub async fn fetch_block_or_retry( } } +/// Fetches the shard from the fastenar by block height and shard id +/// Returns the result in `near_indexer_primitives::IndexerShard` +/// If the block does not exist, returns an error pub async fn fetch_shard( client: &FastNearClient, block_height: types::BlockHeight, @@ -90,6 +117,9 @@ pub async fn fetch_shard( } } +/// Fetches the shard from the fastenar by block height and shard id +/// Returns the result in `near_indexer_primitives::IndexerShard` +/// If the block does not exist, retries fetching the block pub async fn fetch_shard_or_retry( client: &FastNearClient, block_height: types::BlockHeight, diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 224ec7a..7bfd008 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -2,6 +2,11 @@ pub mod client; pub mod fetchers; pub mod types; +/// Starts the FastNear provider +/// Fetches the blocks from the FastNear and sends them to the blocks_sink +/// The fetching is done in parallel with multiple threads +/// The number of threads is defined in the FastNearConfig +/// The fetching starts from the start_block_height and continues until the last block #[allow(unused_labels)] // we use loop labels for code-readability pub async fn start( blocks_sink: tokio::sync::mpsc::Sender, diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index 76e5ba5..4028188 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -105,7 +105,7 @@ pub async fn list_block_heights( /// - shard_N.json /// Reads the content of the objects and parses as a JSON. /// Returns the result in `near_indexer_primitives::StreamerMessage` -pub(crate) async fn fetch_streamer_message( +pub async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: types::BlockHeight, From b7291e0fc62cd4a621d0d097a757935f06f11b86 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 2 Oct 2024 14:00:12 +0300 Subject: [PATCH 5/8] improvement --- src/lib.rs | 3 ++- src/providers/fastnear/client.rs | 4 ++-- src/providers/fastnear/mod.rs | 2 +- src/providers/fastnear/types.rs | 6 ++++++ 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 885e2ec..ff46685 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,9 +280,10 @@ extern crate derive_builder; pub use near_indexer_primitives; pub use aws_credential_types::Credentials; +pub use providers::fastnear::client as fastnear_client; pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; +pub use providers::s3::client as s3_client; pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; - pub mod providers; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs index 103884a..1d25d37 100644 --- a/src/providers/fastnear/client.rs +++ b/src/providers/fastnear/client.rs @@ -9,9 +9,9 @@ pub struct FastNearClient { } impl FastNearClient { - pub fn new(config: &types::FastNearConfig) -> Self { + pub fn new(endpoint: String) -> Self { Self { - endpoint: config.endpoint.clone(), + endpoint, client: reqwest::Client::new(), } } diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 7bfd008..234c2e5 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -12,7 +12,7 @@ pub async fn start( blocks_sink: tokio::sync::mpsc::Sender, config: types::FastNearConfig, ) -> anyhow::Result<()> { - let client = client::FastNearClient::new(&config); + let client = config.client(); let max_num_threads = config.num_threads; let next_sink_block = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index eafc7ab..3dcc47c 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -31,6 +31,12 @@ pub struct FastNearConfig { pub(crate) blocks_preload_pool_size: usize, } +impl FastNearConfig { + pub fn client(&self) -> crate::fastnear_client::FastNearClient { + crate::fastnear_client::FastNearClient::new(self.endpoint.clone()) + } +} + impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] for mainnet /// ``` From a01006778c7ed8dfb8835dc7d36c6a6ef361361f Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 10 Oct 2024 15:49:05 +0300 Subject: [PATCH 6/8] reffactoring --- src/lib.rs | 21 ++-- src/providers/fastnear/client.rs | 9 +- src/providers/fastnear/fetchers.rs | 15 +++ src/providers/fastnear/mod.rs | 4 +- src/providers/fastnear/types.rs | 6 -- src/providers/s3/client.rs | 148 +++++++++++++++++++++++++++ src/providers/s3/fetchers.rs | 158 +---------------------------- src/providers/s3/mod.rs | 4 +- 8 files changed, 192 insertions(+), 173 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ff46685..bf246a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,11 +280,17 @@ extern crate derive_builder; pub use near_indexer_primitives; pub use aws_credential_types::Credentials; -pub use providers::fastnear::client as fastnear_client; + +mod providers; + +pub use providers::fastnear; +pub use providers::s3; + +pub use providers::fastnear::client::FastNearClient; pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; -pub use providers::s3::client as s3_client; + +pub use providers::s3::client::LakeS3Client; pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; -pub mod providers; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; @@ -317,11 +323,10 @@ pub fn streamer>( let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); match config { providers::NearLakeFrameworkConfig::Lake(config) => { - (tokio::spawn(providers::s3::start(sender, config)), receiver) + (tokio::spawn(s3::start(sender, config)), receiver) + } + providers::NearLakeFrameworkConfig::FastNear(config) => { + (tokio::spawn(fastnear::start(sender, config)), receiver) } - providers::NearLakeFrameworkConfig::FastNear(config) => ( - tokio::spawn(providers::fastnear::start(sender, config)), - receiver, - ), } } diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs index 1d25d37..daba3ef 100644 --- a/src/providers/fastnear/client.rs +++ b/src/providers/fastnear/client.rs @@ -4,8 +4,8 @@ use super::types; /// It is used to fetch the blocks from the FastNear #[derive(Clone, Debug)] pub struct FastNearClient { - client: reqwest::Client, endpoint: String, + client: reqwest::Client, } impl FastNearClient { @@ -16,6 +16,13 @@ impl FastNearClient { } } + pub fn from_conf(config: &types::FastNearConfig) -> Self { + Self { + endpoint: config.endpoint.clone(), + client: reqwest::Client::new(), + } + } + /// Fetches the block from the FastNear API /// Returns the result in `Option` /// If the block does not exist, returns `None` diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs index 25fb813..62209f0 100644 --- a/src/providers/fastnear/fetchers.rs +++ b/src/providers/fastnear/fetchers.rs @@ -21,6 +21,21 @@ pub async fn fetch_optimistic_block( .expect("Failed to fetch optimistic block") } +/// Fetches the optimistic block from the fastenar +/// This function is used to fetch the optimistic block by height +/// This function will be using endpoint `/v0/block_opt/:block_height` +/// This would be waiting some time until the optimistic block is available +/// Returns `near_indexer_primitives::StreamerMessage` if the block is available +/// Returns `None` if the block height is skipped +pub async fn fetch_optimistic_block_by_height( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Option { + client + .fetch_until_success(&format!("/v0/block_opt/{}", block_height)) + .await +} + /// Fetches the genesis block from the fastenar /// Returns `near_indexer_primitives::StreamerMessage` pub async fn fetch_first_block( diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 234c2e5..a201f92 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -1,3 +1,5 @@ +use crate::FastNearClient; + pub mod client; pub mod fetchers; pub mod types; @@ -12,7 +14,7 @@ pub async fn start( blocks_sink: tokio::sync::mpsc::Sender, config: types::FastNearConfig, ) -> anyhow::Result<()> { - let client = config.client(); + let client = FastNearClient::from_conf(&config); let max_num_threads = config.num_threads; let next_sink_block = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index 3dcc47c..eafc7ab 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -31,12 +31,6 @@ pub struct FastNearConfig { pub(crate) blocks_preload_pool_size: usize, } -impl FastNearConfig { - pub fn client(&self) -> crate::fastnear_client::FastNearClient { - crate::fastnear_client::FastNearClient::new(self.endpoint.clone()) - } -} - impl FastNearConfigBuilder { /// Shortcut to set up [FastNearConfigBuilder] for mainnet /// ``` diff --git a/src/providers/s3/client.rs b/src/providers/s3/client.rs index 6d718b4..70cf91e 100644 --- a/src/providers/s3/client.rs +++ b/src/providers/s3/client.rs @@ -80,3 +80,151 @@ pub trait S3Client: Send + Sync { start_after_prefix: &str, ) -> Result, ListCommonPrefixesError>; } + +#[derive(Clone, Debug)] +pub struct LakeS3Client { + s3: aws_sdk_s3::Client, +} + +impl LakeS3Client { + pub fn new(s3: aws_sdk_s3::Client) -> Self { + Self { s3 } + } + + pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { + let s3_client = aws_sdk_s3::Client::from_conf(config); + + Self { s3: s3_client } + } +} + +#[async_trait] +impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let object = self + .s3 + .get_object() + .bucket(bucket) + .key(prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .send() + .await?; + + let bytes = object.body.collect().await?.into_bytes().to_vec(); + + Ok(bytes) + } + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError> { + let response = self + .s3 + .list_objects_v2() + .max_keys(1000) // 1000 is the default and max value for this parameter + .delimiter("/".to_string()) + .start_after(start_after_prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .bucket(bucket) + .send() + .await?; + + let prefixes = match response.common_prefixes { + None => vec![], + Some(common_prefixes) => common_prefixes + .into_iter() + .filter_map(|common_prefix| common_prefix.prefix) + .collect::>() + .into_iter() + .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) + .collect(), + }; + + Ok(prefixes) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::sync::Arc; + + use crate::providers::s3::fetchers::fetch_streamer_message; + use async_trait::async_trait; + + #[derive(Clone, Debug)] + pub struct LakeS3Client {} + + #[async_trait] + impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + _bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); + tokio::fs::read(path) + .await + .map_err(|e| GetObjectBytesError(Arc::new(e))) + } + + async fn list_common_prefixes( + &self, + _bucket: &str, + _start_after: &str, + ) -> Result, ListCommonPrefixesError> { + Ok(Vec::new()) + } + } + + #[tokio::test] + async fn deserializes_meta_transactions() { + let lake_client = LakeS3Client {}; + let streamer_message = + fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) + .await + .unwrap(); + + let delegate_action = &streamer_message.shards[0] + .chunk + .as_ref() + .unwrap() + .transactions[0] + .transaction + .actions[0]; + + assert_eq!( + serde_json::to_value(delegate_action).unwrap(), + serde_json::json!({ + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + }) + ); + } +} diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index 4028188..0d360f9 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -1,80 +1,6 @@ use std::str::FromStr; -use async_trait::async_trait; - -use super::{ - client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}, - types, -}; - -#[derive(Clone, Debug)] -pub struct LakeS3Client { - s3: aws_sdk_s3::Client, -} - -impl LakeS3Client { - pub fn new(s3: aws_sdk_s3::Client) -> Self { - Self { s3 } - } - - pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { - let s3_client = aws_sdk_s3::Client::from_conf(config); - - Self { s3: s3_client } - } -} - -#[async_trait] -impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let object = self - .s3 - .get_object() - .bucket(bucket) - .key(prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .send() - .await?; - - let bytes = object.body.collect().await?.into_bytes().to_vec(); - - Ok(bytes) - } - - async fn list_common_prefixes( - &self, - bucket: &str, - start_after_prefix: &str, - ) -> Result, ListCommonPrefixesError> { - let response = self - .s3 - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(start_after_prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .bucket(bucket) - .send() - .await?; - - let prefixes = match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) - .collect(), - }; - - Ok(prefixes) - } -} +use super::{client::S3Client, types}; /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched @@ -103,8 +29,8 @@ pub async fn list_block_heights( /// By the given block height gets the objects: /// - block.json /// - shard_N.json -/// Reads the content of the objects and parses as a JSON. -/// Returns the result in `near_indexer_primitives::StreamerMessage` +/// Reads the content of the objects and parses as a JSON. +/// Returns the result in `near_indexer_primitives::StreamerMessage` pub async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, @@ -254,81 +180,3 @@ pub async fn fetch_shard_or_retry( } } } - -#[cfg(test)] -mod test { - use super::*; - - use std::sync::Arc; - - use async_trait::async_trait; - - #[derive(Clone, Debug)] - pub struct LakeS3Client {} - - #[async_trait] - impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - _bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - tokio::fs::read(path) - .await - .map_err(|e| GetObjectBytesError(Arc::new(e))) - } - - async fn list_common_prefixes( - &self, - _bucket: &str, - _start_after: &str, - ) -> Result, ListCommonPrefixesError> { - Ok(Vec::new()) - } - } - - #[tokio::test] - async fn deserializes_meta_transactions() { - let lake_client = LakeS3Client {}; - let streamer_message = - fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) - .await - .unwrap(); - - let delegate_action = &streamer_message.shards[0] - .chunk - .as_ref() - .unwrap() - .transactions[0] - .transaction - .actions[0]; - - assert_eq!( - serde_json::to_value(delegate_action).unwrap(), - serde_json::json!({ - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - }) - ); - } -} diff --git a/src/providers/s3/mod.rs b/src/providers/s3/mod.rs index aeeb2bc..f326d7c 100644 --- a/src/providers/s3/mod.rs +++ b/src/providers/s3/mod.rs @@ -106,14 +106,14 @@ pub(crate) async fn start( let lake_s3_client: Box = if let Some(s3_client) = config.s3_client { s3_client } else if let Some(config) = config.s3_config { - Box::new(fetchers::LakeS3Client::from_conf(config)) + Box::new(client::LakeS3Client::from_conf(config)) } else { let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) .region(aws_types::region::Region::new(config.s3_region_name)) .build(); - Box::new(fetchers::LakeS3Client::from_conf(s3_config)) + Box::new(client::LakeS3Client::from_conf(s3_config)) }; let mut last_processed_block_hash: Option = None; From f95205292455d55edc774c6bf2c6dfb410217a69 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 24 Oct 2024 11:24:20 +0300 Subject: [PATCH 7/8] improvement --- CHANGELOG.md | 10 +++++----- README.md | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba317ee..6332ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,19 +10,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.7.10](https://github.com/near/near-lake-framework/compare/v0.7.9...0.7.10) * Upgrade `near-indexer-primitives` to `0.27.0` (nearcore-2.3.0) -* Added new provider `fastnear` - a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. Check the [FastNear](https://fastnear.com/) provider for more details. +* Added new provider `fastnear` - a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. Check the [FastNear](https://fastnear.com/) and [Near Data Server](https://github.com/fastnear/neardata-server/) to get more details about provider. ### Breaking Change * `s3_fetchers` rename to `fetchers` and move to the `providers` module. New usage example: ```rust - use near_lake_framework::providers::s3::fetchers::fetch_streamer_message; - use near_lake_framework::providers::fastnear::fetchers::fetch_streamer_message; + use near_lake_framework::s3::fetchers::fetch_streamer_message; + use near_lake_framework::fastnear::fetchers::fetch_streamer_message; ``` * `s3_client` rename to `client` and move to the `providers` module. New usage example: ```rust - use near_lake_framework::providers::s3::client::S3Client; - use near_lake_framework::providers::fastnear::client::FastNearClient; + use near_lake_framework::S3Client; + use near_lake_framework::FastNearClient; ``` ## [0.7.9](https://github.com/near/near-lake-framework/compare/v0.7.7...0.7.9) diff --git a/README.md b/README.md index 45a35b6..3c18e64 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,7 @@ The price depends on the number of shards FastNear provides a service to access the NEAR Protocol data - [FastNear](https://fastnear.com/) +- [Near Data Server](https://github.com/fastnear/neardata-server/) FastNear provider is a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. From ab9b1d1a5176ac14017f253d448cd93c83cd61a3 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 6 Nov 2024 14:07:49 +0200 Subject: [PATCH 8/8] update near-indexer-primitives to 0.27.0 --- Cargo.toml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e214c90..f3504c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,11 @@ categories = ["asynchronous", "api-bindings", "network-programming"] keywords = ["near", "near-lake", "near-indexer"] authors = ["Near Inc "] edition = "2021" -rust-version = "1.79.0" +rust-version = "1.81.0" # cargo-workspaces [workspace.metadata.workspaces] -version = "0.7.9" +version = "0.7.10" [dependencies] anyhow = "1.0.79" @@ -33,10 +33,7 @@ tokio = { version = "1.35.1", features = ["sync", "time", "rt", "macros"] } tokio-stream = { version = "0.1.14" } tracing = "0.1.40" -# Bug with deserialization of the transactions it should be fixed in the next release 0.27.0 -# near-indexer-primitives = "0.26.0" -# for now we use the forked version -near-indexer-primitives = { git = 'https://github.com/kobayurii/nearcore.git', branch = "2.2.1-fork1" } +near-indexer-primitives = "0.27.0" [lib] doctest = false