From bc806789cc7e7d71b26df76d09a6832444eefff6 Mon Sep 17 00:00:00 2001 From: Phuong N Date: Thu, 16 Jan 2025 20:20:24 +0000 Subject: [PATCH] Report error on lake build failure --- chain-signatures/node/src/indexer.rs | 92 +++++++++++++++++----------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 7fdd5eb3..c7783b27 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -3,7 +3,7 @@ use crate::storage::app_data_storage::AppDataStorage; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; use near_account_id::AccountId; -use near_lake_framework::{LakeBuilder, LakeContext}; +use near_lake_framework::{Lake, LakeBuilder, LakeContext}; use near_lake_primitives::actions::ActionMetaDataExt; use near_lake_primitives::receipts::ExecutionStatus; @@ -324,52 +324,41 @@ pub fn run( .enable_all() .build()?; + let mut start = Instant::now(); // If indexer fails for whatever reason, let's spin it back up: let mut i = 0; loop { if i > 0 { - tracing::warn!("restarting indexer after failure: restart count={i}"); + tracing::warn!( + restart_count = i, + elapsed = ?start.elapsed(), + "restarting indexer after failure", + ); + start = Instant::now(); } i += 1; - let Ok(lake) = rt.block_on(async { - update_last_processed_block(rpc_client.clone(), app_data_storage.clone()).await?; - - if let Some(latest) = context.indexer.last_processed_block().await { - if i > 0 { - tracing::warn!("indexer latest height {latest}, restart count={i}"); - } - let mut lake_builder = LakeBuilder::default() - .s3_bucket_name(&options.s3_bucket) - .s3_region_name(&options.s3_region) - .start_block_height(latest); - - if let Some(s3_url) = &options.s3_url { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .endpoint_url(s3_url) - .build(); - lake_builder = lake_builder.s3_config(s3_config); - } - let lake = lake_builder.build()?; - anyhow::Ok(lake) - } else { - tracing::warn!("indexer failed to get last processed block"); - Err(anyhow::anyhow!("failed to get last processed block")) + let lake = match rt.block_on(build_lake( + &rpc_client, + &app_data_storage, + &context, + &options, + )) { + Ok(lake) => lake, + Err(err) => { + tracing::error!(?options, ?err, "indexer failed to build"); + backoff(i, 1, 120); + continue; } - }) else { - tracing::error!(?options, "indexer failed to build"); - backoff(i, 1, 120); - continue; }; // TODO/NOTE: currently indexer does not have any interrupt handlers and will never yield back // as successful. We can add interrupt handlers in the future but this is not important right // now since we managing nodes through integration tests that can kill it or through docker. - let join_handle = { + let join_handle = rt.spawn({ let context = context.clone(); - rt.spawn(async move { lake.run_with_context_async(handle_block, &context).await }) - }; + async move { lake.run_with_context_async(handle_block, &context).await } + }); let outcome = rt.block_on(async { if i > 0 { // give it some time to catch up @@ -407,7 +396,7 @@ pub fn run( } } - backoff(i, 1, 120) + backoff(i, 1, 120); } Ok(()) }); @@ -415,10 +404,39 @@ pub fn run( Ok((join_handle, indexer)) } +async fn build_lake( + rpc_client: &near_fetch::Client, + app_data_storage: &AppDataStorage, + context: &Context, + options: &Options, +) -> anyhow::Result { + update_last_processed_block(rpc_client, app_data_storage).await?; + + let Some(latest) = context.indexer.last_processed_block().await else { + tracing::warn!("indexer failed to get last processed block"); + anyhow::bail!("failed to get last processed block"); + }; + + tracing::info!(latest, "indexer latest height"); + let mut lake_builder = LakeBuilder::default() + .s3_bucket_name(&options.s3_bucket) + .s3_region_name(&options.s3_region) + .start_block_height(latest); + + if let Some(s3_url) = &options.s3_url { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .endpoint_url(s3_url) + .build(); + lake_builder = lake_builder.s3_config(s3_config); + } + Ok(lake_builder.build()?) +} + /// This function ensures we do not go back in time a lot when restarting the node async fn update_last_processed_block( - rpc_client: near_fetch::Client, - app_data_storage: AppDataStorage, + rpc_client: &near_fetch::Client, + app_data_storage: &AppDataStorage, ) -> anyhow::Result<()> { let last_processed_block = match app_data_storage.last_processed_block().await { Ok(Some(block_height)) => block_height, @@ -464,5 +482,5 @@ async fn update_last_processed_block( fn backoff(i: u32, multiplier: u32, max: u64) { // Exponential backoff with max delay of max seconds let delay: u64 = std::cmp::min(2u64.pow(i).mul(multiplier as u64), max); - std::thread::sleep(std::time::Duration::from_secs(delay)); + std::thread::sleep(Duration::from_secs(delay)); }