diff --git a/CHANGELOG.md b/CHANGELOG.md index 0411296..091999f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.4...HEAD) +## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.5...HEAD) -## [0.7.3](https://github.com/near/near-lake-framework/compare/v0.7.2...0.7.4) +## [0.7.5](https://github.com/near/near-lake-framework/compare/v0.7.3...0.7.5) + +* Refactor `s3_fetchers` module to allow exposing the underlying functionality: + * `s3_fetchers::fetch_block` (without retrying) + * `s3_fetchers::fetch_shard` (without retrying) + +## [0.7.4](https://github.com/near/near-lake-framework/compare/v0.7.2...0.7.4) * Upgrade all `aws` crates to the latest version * Undeylying AWS error will now be printed diff --git a/Cargo.toml b/Cargo.toml index 883acc8..5fc437e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ rust-version = "1.58.1" # cargo-workspaces [workspace.metadata.workspaces] -version = "0.7.4" +version = "0.7.5" [dependencies] anyhow = "1.0.51" @@ -20,6 +20,7 @@ aws-config = { version = "1.0.0", features = ["behavior-version-latest"] } aws-types = "1.0.0" aws-credential-types = "1.0.0" aws-sdk-s3 = "0.39.1" +aws-smithy-types = "1.0.1" async-stream = "0.3.3" async-trait = "0.1.64" derive_builder = "0.11.2" @@ -35,7 +36,6 @@ near-indexer-primitives = "0.17" [dev-dependencies] aws-smithy-http = "0.60.0" -aws-smithy-types = "1.0.1" [lib] doctest = false diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 577c0da..1f13461 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -142,7 +142,7 @@ pub(crate) async fn fetch_streamer_message( } /// Fetches the block data JSON from AWS S3 and returns the `BlockView` -pub async fn fetch_block_or_retry( +pub async fn fetch_block( lake_s3_client: &impl S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, @@ -150,42 +150,63 @@ pub async fn fetch_block_or_retry( near_indexer_primitives::views::BlockView, crate::types::LakeError, > { - let body_bytes = loop { - match lake_s3_client - .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) - .await - { - Ok(response) => { - match response.body.collect().await { - Ok(bytes_stream) => break bytes_stream.into_bytes(), - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}", - block_height, - err, - ); - } - }; - } - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to get {:0>12}/block.json. Retrying immediately\n{:#?}", - block_height, - err - ); - } - }; - }; + let body_bytes = lake_s3_client + .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) + .await? + .body + .collect() + .await? + .into_bytes(); Ok(serde_json::from_slice::< near_indexer_primitives::views::BlockView, >(body_bytes.as_ref())?) } +/// Fetches the block data JSON from AWS S3 and returns the `BlockView` retrying until it succeeds (indefinitely) +pub async fn fetch_block_or_retry( + lake_s3_client: &impl S3Client, + s3_bucket_name: &str, + block_height: crate::types::BlockHeight, +) -> Result< + near_indexer_primitives::views::BlockView, + crate::types::LakeError, +> { + loop { + match fetch_block(lake_s3_client, s3_bucket_name, block_height).await { + Ok(block_view) => break Ok(block_view), + Err(err) => match err { + crate::types::LakeError::AwsError { .. } => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Block #{:0>12} not found. Retrying in immediately...\n{:#?}", + block_height, + err, + ); + } + crate::types::LakeError::AwsSmithyError { .. } => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}", + block_height, + err, + ); + } + _ => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to fetch block #{}, retrying immediately\n{:#?}", + block_height, + err + ); + } + }, + } + } +} + /// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` -pub async fn fetch_shard_or_retry( +pub async fn fetch_shard( lake_s3_client: &impl S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, @@ -194,48 +215,68 @@ pub async fn fetch_shard_or_retry( near_indexer_primitives::IndexerShard, crate::types::LakeError, > { - let body_bytes = loop { - match lake_s3_client - .get_object( - s3_bucket_name, - &format!("{:0>12}/shard_{}.json", block_height, shard_id), - ) - .await - { - Ok(response) => { - let body_bytes = match response.body.collect().await { - Ok(body) => body.into_bytes(), - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read the {:0>12}/shard_{}.json. Retrying in 1s...\n {:#?}", - block_height, - shard_id, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - }; - - break body_bytes; - } - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to fetch shard #{}, retrying immediately\n{:#?}", - shard_id, - err - ); - } - } - }; + let body_bytes = lake_s3_client + .get_object( + s3_bucket_name, + &format!("{:0>12}/shard_{}.json", block_height, shard_id), + ) + .await? + .body + .collect() + .await? + .into_bytes(); Ok(serde_json::from_slice::< near_indexer_primitives::IndexerShard, >(body_bytes.as_ref())?) } +/// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` +pub async fn fetch_shard_or_retry( + lake_s3_client: &impl S3Client, + s3_bucket_name: &str, + block_height: crate::types::BlockHeight, + shard_id: u64, +) -> Result< + near_indexer_primitives::IndexerShard, + crate::types::LakeError, +> { + loop { + match fetch_shard(lake_s3_client, s3_bucket_name, block_height, shard_id).await { + Ok(shard) => break Ok(shard), + Err(err) => match err { + crate::types::LakeError::AwsError { .. } => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Shard {} of block #{:0>12} not found. Retrying in immediately...\n{:#?}", + shard_id, + block_height, + err, + ); + } + crate::types::LakeError::AwsSmithyError { .. } => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to read bytes from the shard {} of block #{:0>12} response. Retrying immediately.\n{:#?}", + shard_id, + block_height, + err, + ); + } + _ => { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to fetch shard {} of block #{}, retrying immediately\n{:#?}", + shard_id, + block_height, + err + ); + } + }, + } + } +} + #[cfg(test)] mod test { use super::*; @@ -255,7 +296,7 @@ mod test { impl S3Client for LakeS3Client { async fn get_object( &self, - bucket: &str, + _bucket: &str, prefix: &str, ) -> Result< aws_sdk_s3::operation::get_object::GetObjectOutput, @@ -269,8 +310,8 @@ mod test { async fn list_objects( &self, - bucket: &str, - start_after: &str, + _bucket: &str, + _start_after: &str, ) -> Result< aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output, aws_sdk_s3::error::SdkError, diff --git a/src/types.rs b/src/types.rs index f4aa91b..fe255d8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -115,19 +115,24 @@ impl LakeConfigBuilder { #[allow(clippy::enum_variant_names)] #[derive(thiserror::Error, Debug)] pub enum LakeError { - #[error("Failed to parse structure from JSON: {error_message}")] + #[error("Failed to parse structure from JSON: {error_message:?}")] ParseError { #[from] error_message: serde_json::Error, }, - #[error("AWS S3 error: {error}")] + #[error("AWS S3 error: {error:?}")] AwsError { #[from] error: aws_sdk_s3::error::SdkError, }, - #[error("Failed to convert integer")] + #[error("Failed to convert integer: {error:?}")] IntConversionError { #[from] error: std::num::TryFromIntError, }, + #[error("AWS Smithy byte_stream error: {error:?}")] + AwsSmithyError { + #[from] + error: aws_smithy_types::byte_stream::error::Error, + }, }