Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: report error on lake build failure #112

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 55 additions & 37 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::storage::app_data_storage::AppDataStorage;
use crypto_shared::{derive_epsilon, ScalarExt};
use k256::Scalar;
use near_account_id::AccountId;
use near_lake_framework::{LakeBuilder, LakeContext};
use near_lake_framework::{Lake, LakeBuilder, LakeContext};
use near_lake_primitives::actions::ActionMetaDataExt;
use near_lake_primitives::receipts::ExecutionStatus;

Expand Down Expand Up @@ -324,52 +324,41 @@ pub fn run(
.enable_all()
.build()?;

let mut start = Instant::now();
// If indexer fails for whatever reason, let's spin it back up:
let mut i = 0;
loop {
if i > 0 {
tracing::warn!("restarting indexer after failure: restart count={i}");
tracing::warn!(
restart_count = i,
elapsed = ?start.elapsed(),
"restarting indexer after failure",
);
start = Instant::now();
}
i += 1;

let Ok(lake) = rt.block_on(async {
update_last_processed_block(rpc_client.clone(), app_data_storage.clone()).await?;

if let Some(latest) = context.indexer.last_processed_block().await {
if i > 0 {
tracing::warn!("indexer latest height {latest}, restart count={i}");
}
let mut lake_builder = LakeBuilder::default()
.s3_bucket_name(&options.s3_bucket)
.s3_region_name(&options.s3_region)
.start_block_height(latest);

if let Some(s3_url) = &options.s3_url {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.endpoint_url(s3_url)
.build();
lake_builder = lake_builder.s3_config(s3_config);
}
let lake = lake_builder.build()?;
anyhow::Ok(lake)
} else {
tracing::warn!("indexer failed to get last processed block");
Err(anyhow::anyhow!("failed to get last processed block"))
let lake = match rt.block_on(build_lake(
&rpc_client,
&app_data_storage,
&context,
&options,
)) {
Ok(lake) => lake,
Err(err) => {
tracing::error!(?options, ?err, "indexer failed to build");
backoff(i, 1, 120);
continue;
}
}) else {
tracing::error!(?options, "indexer failed to build");
backoff(i, 1, 120);
continue;
};

// TODO/NOTE: currently indexer does not have any interrupt handlers and will never yield back
// as successful. We can add interrupt handlers in the future but this is not important right
// now since we managing nodes through integration tests that can kill it or through docker.
let join_handle = {
let join_handle = rt.spawn({
let context = context.clone();
rt.spawn(async move { lake.run_with_context_async(handle_block, &context).await })
};
async move { lake.run_with_context_async(handle_block, &context).await }
});
let outcome = rt.block_on(async {
if i > 0 {
// give it some time to catch up
Expand Down Expand Up @@ -407,18 +396,47 @@ pub fn run(
}
}

backoff(i, 1, 120)
backoff(i, 1, 120);
}
Ok(())
});

Ok((join_handle, indexer))
}

async fn build_lake(
rpc_client: &near_fetch::Client,
app_data_storage: &AppDataStorage,
context: &Context,
options: &Options,
) -> anyhow::Result<Lake> {
update_last_processed_block(rpc_client, app_data_storage).await?;

let Some(latest) = context.indexer.last_processed_block().await else {
tracing::warn!("indexer failed to get last processed block");
anyhow::bail!("failed to get last processed block");
};

tracing::info!(latest, "indexer latest height");
let mut lake_builder = LakeBuilder::default()
.s3_bucket_name(&options.s3_bucket)
.s3_region_name(&options.s3_region)
.start_block_height(latest);

if let Some(s3_url) = &options.s3_url {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.endpoint_url(s3_url)
.build();
lake_builder = lake_builder.s3_config(s3_config);
}
Ok(lake_builder.build()?)
}

/// This function ensures we do not go back in time a lot when restarting the node
async fn update_last_processed_block(
rpc_client: near_fetch::Client,
app_data_storage: AppDataStorage,
rpc_client: &near_fetch::Client,
app_data_storage: &AppDataStorage,
) -> anyhow::Result<()> {
let last_processed_block = match app_data_storage.last_processed_block().await {
Ok(Some(block_height)) => block_height,
Expand Down Expand Up @@ -464,5 +482,5 @@ async fn update_last_processed_block(
fn backoff(i: u32, multiplier: u32, max: u64) {
// Exponential backoff with max delay of max seconds
let delay: u64 = std::cmp::min(2u64.pow(i).mul(multiplier as u64), max);
std::thread::sleep(std::time::Duration::from_secs(delay));
std::thread::sleep(Duration::from_secs(delay));
}
Loading