diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index b8bf7a49a..5bc8b4554 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -1,4 +1,5 @@ use crate::config::{Config, PredicatesApi}; +use crate::scan::common::get_block_heights_to_scan; use crate::service::{ open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, @@ -19,7 +20,7 @@ use chainhook_sdk::observer::{gather_proofs, EventObserverConfig}; use chainhook_sdk::types::{ BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BlockIdentifier, Chain, }; -use chainhook_sdk::utils::{file_append, send_request, BlockHeights, BlockHeightsError, Context}; +use chainhook_sdk::utils::{file_append, send_request, Context}; use std::collections::HashMap; pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( @@ -39,73 +40,28 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( return Err(format!("Bitcoin RPC error: {}", message.to_string())); } }; - let mut floating_end_block = false; - let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks { - match BlockHeights::Blocks(blocks.clone()).get_sorted_entries() { - Ok(heights) => heights, - Err(e) => match e { - BlockHeightsError::ExceedsMaxEntries(max, specified) => { - return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified)); - } - BlockHeightsError::StartLargerThanEnd => unreachable!(), - }, - } - } else { - let start_block = match &unfinished_scan_data { - Some(scan_data) => scan_data.last_evaluated_block_height, - None => predicate_spec.start_block.unwrap_or(0), - }; - let chain_tip = match bitcoin_rpc.get_blockchain_info() { - Ok(result) => result.blocks, - Err(e) => { - return Err(format!( - "unable to retrieve Bitcoin chain tip ({})", - e.to_string() - )); - } - }; - let (end_block, update_end_block) = if let Some(end_block) = predicate_spec.end_block { - if start_block > end_block { - return Err( - "Chainhook specification field `end_block` should be greater than `start_block`." - .into(), - ); - } - // if the user provided an end block that is above the chain tip, we'll - // only scan up to the chain tip, then go to streaming mode - if end_block > chain_tip { - (chain_tip, true) - } else { - (end_block, false) - } - } else { - (chain_tip, true) - }; - - // we've already made this check with a user-provided end_block. But if the user didn't provide one, or if - // they provided one greater than chain tip, we could still have a start block that's greater then end block. - // but this time, it's not an error, we just want to start streaming. - if start_block > end_block { - info!(ctx.expect_logger(), "Chainhook specification field `start_block` is greater than Bitcoin chain tip for predicate {}. Switching to streaming mode.", predicate_spec.uuid); - return Ok(false); + let mut chain_tip = match bitcoin_rpc.get_blockchain_info() { + Ok(result) => result.blocks, + Err(e) => { + return Err(format!( + "unable to retrieve Bitcoin chain tip ({})", + e.to_string() + )); } + }; - floating_end_block = update_end_block; - match BlockHeights::BlockRange(start_block, end_block).get_sorted_entries() { - Ok(heights) => heights, - Err(e) => match e { - BlockHeightsError::ExceedsMaxEntries(max, specified) => { - return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified)); - } - BlockHeightsError::StartLargerThanEnd => { - return Err( - "Chainhook specification field `end_block` should be greater than `start_block`." - .into(), - ); - } - }, - } + let block_heights_to_scan = get_block_heights_to_scan( + &predicate_spec.blocks, + &predicate_spec.start_block, + &predicate_spec.end_block, + &chain_tip, + &unfinished_scan_data, + )?; + let mut block_heights_to_scan = match block_heights_to_scan { + Some(h) => h, + // no blocks to scan, go straight to streaming + None => return Ok(false), }; let mut predicates_db_conn = match config.http_api { @@ -142,6 +98,30 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let http_client = build_http_client(); while let Some(current_block_height) = block_heights_to_scan.pop_front() { + if current_block_height > chain_tip { + let prev_chain_tip = chain_tip; + // we've scanned up to the chain tip as of the start of this scan + // so see if the chain has progressed since then + chain_tip = match bitcoin_rpc.get_blockchain_info() { + Ok(result) => result.blocks, + Err(e) => { + return Err(format!( + "unable to retrieve Bitcoin chain tip ({})", + e.to_string() + )); + } + }; + // if not, break out so we can enter streaming mode + // and put back the block we weren't able to scan + if current_block_height > chain_tip { + block_heights_to_scan.push_front(current_block_height); + break; + } else { + // if so, update our total number of blocks to scan + number_of_blocks_to_scan += chain_tip - prev_chain_tip; + } + } + number_of_blocks_scanned += 1; let block_hash = retrieve_block_hash_with_retry( @@ -216,30 +196,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( ); } } - - if block_heights_to_scan.is_empty() && floating_end_block { - let new_tip = match bitcoin_rpc.get_blockchain_info() { - Ok(result) => match predicate_spec.end_block { - Some(end_block) => { - if end_block > result.blocks { - result.blocks - } else { - end_block - } - } - None => result.blocks, - }, - Err(_e) => { - continue; - } - }; - - for entry in (current_block_height + 1)..new_tip { - block_heights_to_scan.push_back(entry); - } - number_of_blocks_to_scan += block_heights_to_scan.len() as u64; - } } + info!( ctx.expect_logger(), "{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered" @@ -255,10 +213,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( predicates_db_conn, ctx, ); + // otherwise, we are finished scanning so we can expire the predicate if an end_block was provided if let Some(predicate_end_block) = predicate_spec.end_block { - if predicate_end_block == last_block_scanned.index { - // todo: we need to find a way to check if this block is confirmed - // and if so, set the status to confirmed expiration + if block_heights_to_scan.is_empty() && predicate_end_block == last_block_scanned.index { set_unconfirmed_expiration_status( &Chain::Bitcoin, number_of_blocks_scanned, diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 396570d64..4daecca8d 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -3,6 +3,7 @@ use std::collections::{HashMap, VecDeque}; use crate::{ archive::download_stacks_dataset_if_required, config::{Config, PredicatesApi}, + scan::common::get_block_heights_to_scan, service::{ open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, @@ -13,10 +14,11 @@ use crate::{ open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, }, }; +use chainhook_sdk::types::{BlockIdentifier, Chain}; use chainhook_sdk::{ chainhooks::stacks::evaluate_stacks_chainhook_on_blocks, indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, - utils::{BlockHeights, Context}, + utils::Context, }; use chainhook_sdk::{ chainhooks::{ @@ -25,10 +27,6 @@ use chainhook_sdk::{ }, utils::{file_append, send_request, AbstractStacksBlock}, }; -use chainhook_sdk::{ - types::{BlockIdentifier, Chain}, - utils::BlockHeightsError, -}; use rocksdb::DB; #[derive(Debug, Clone, Eq, PartialEq)] @@ -171,75 +169,28 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( config: &Config, ctx: &Context, ) -> Result<(Option, bool), String> { - let mut floating_end_block = false; - - let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks { - match BlockHeights::Blocks(blocks.clone()).get_sorted_entries() { - Ok(heights) => heights, - Err(e) => match e { - BlockHeightsError::ExceedsMaxEntries(max, specified) => { - return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified)); - } - BlockHeightsError::StartLargerThanEnd => unreachable!(), - }, - } - } else { - let start_block = match &unfinished_scan_data { - Some(scan_data) => scan_data.last_evaluated_block_height, - None => predicate_spec.start_block.unwrap_or(0), - }; - let chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { + let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { + Some(chain_tip) => chain_tip, + None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, - None => match get_last_block_height_inserted(stacks_db_conn, ctx) { - Some(chain_tip) => chain_tip, - None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); - return Ok((None, false)); - } - }, - }; - - let (end_block, update_end_block) = if let Some(end_block) = predicate_spec.end_block { - if start_block > end_block { - return Err( - "Chainhook specification field `end_block` should be greater than `start_block`." - .into(), - ); + None => { + info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + return Ok((None, false)); } - // if the user provided an end block that is above the chain tip, we'll - // only scan up to the chain tip, then go to streaming mode - if end_block > chain_tip { - (chain_tip, true) - } else { - (end_block, false) - } - } else { - (chain_tip, true) - }; - - // we've already made this check with a user-provided end_block. But if the user didn't provide one, or if - // they provided one greater than chain tip, we could still have a start block that's greater then end block. - // but this time, it's not an error, we just want to start streaming. - if start_block > end_block { - info!(ctx.expect_logger(), "Chainhook specification field `start_block` is greater than Stacks chain tip for predicate {}. Switching to streaming mode.", predicate_spec.uuid); - return Ok((None, false)); - } + }, + }; - floating_end_block = update_end_block; - match BlockHeights::BlockRange(start_block, end_block).get_sorted_entries() { - Ok(heights) => heights, - Err(e) => match e { - BlockHeightsError::ExceedsMaxEntries(max, specified) => { - return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified)); - } - BlockHeightsError::StartLargerThanEnd => { - return Err( - "Chainhook specification field `end_block` should be greater than `start_block`." - .into(), - ); - } - }, - } + let block_heights_to_scan = get_block_heights_to_scan( + &predicate_spec.blocks, + &predicate_spec.start_block, + &predicate_spec.end_block, + &chain_tip, + &unfinished_scan_data, + )?; + let mut block_heights_to_scan = match block_heights_to_scan { + Some(h) => h, + // no blocks to scan, go straight to streaming + None => return Ok((None, false)), }; let mut predicates_db_conn = match config.http_api { @@ -270,7 +221,34 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }; while let Some(current_block_height) = block_heights_to_scan.pop_front() { + if current_block_height > chain_tip { + let prev_chain_tip = chain_tip; + // we've scanned up to the chain tip as of the start of this scan + // so see if the chain has progressed since then + + chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { + Some(chain_tip) => chain_tip, + None => match get_last_block_height_inserted(stacks_db_conn, ctx) { + Some(chain_tip) => chain_tip, + None => { + info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + return Ok((None, false)); + } + }, + }; + // if not, break out so we can enter streaming mode + // and put back the block we weren't able to scan + if current_block_height > chain_tip { + block_heights_to_scan.push_front(current_block_height); + break; + } else { + // if so, update our total number of blocks to scan + number_of_blocks_to_scan += chain_tip - prev_chain_tip; + } + } + number_of_blocks_scanned += 1; + let block_data = match get_stacks_block_at_block_height(current_block_height, true, 3, stacks_db_conn) { Ok(Some(block)) => block, @@ -362,44 +340,6 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( ); } } - - // Update end_block, in case a new block was discovered during the scan - if block_heights_to_scan.is_empty() && floating_end_block { - let new_tip = match predicate_spec.end_block { - Some(end_block) => { - match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { - Some(chain_tip) => { - if end_block > chain_tip { - chain_tip - } else { - end_block - } - } - None => match get_last_block_height_inserted(stacks_db_conn, ctx) { - Some(chain_tip) => { - if end_block > chain_tip { - chain_tip - } else { - end_block - } - } - None => current_block_height, - }, - } - } - None => match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { - Some(end_block) => end_block, - None => match get_last_block_height_inserted(stacks_db_conn, ctx) { - Some(end_block) => end_block, - None => current_block_height, - }, - }, - }; - for entry in (current_block_height + 1)..=new_tip { - block_heights_to_scan.push_back(entry); - } - number_of_blocks_to_scan += block_heights_to_scan.len() as u64; - } } info!( ctx.expect_logger(), @@ -417,7 +357,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( ctx, ); if let Some(predicate_end_block) = predicate_spec.end_block { - if predicate_end_block == last_block_scanned.index { + if block_heights_to_scan.is_empty() && predicate_end_block == last_block_scanned.index { let is_confirmed = match get_stacks_block_at_block_height( predicate_end_block, true,