Skip to content

Commit

Permalink
use helper for getting block heights to scan
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed Feb 6, 2024
1 parent 319ef6b commit 027b3b9
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 201 deletions.
139 changes: 48 additions & 91 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::config::{Config, PredicatesApi};
use crate::scan::common::get_block_heights_to_scan;
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status,
set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData,
Expand All @@ -19,7 +20,7 @@ use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BlockIdentifier, Chain,
};
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, BlockHeightsError, Context};
use chainhook_sdk::utils::{file_append, send_request, Context};
use std::collections::HashMap;

pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
Expand All @@ -39,73 +40,28 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let mut floating_end_block = false;

let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks {
match BlockHeights::Blocks(blocks.clone()).get_sorted_entries() {
Ok(heights) => heights,
Err(e) => match e {
BlockHeightsError::ExceedsMaxEntries(max, specified) => {
return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified));
}
BlockHeightsError::StartLargerThanEnd => unreachable!(),
},
}
} else {
let start_block = match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => predicate_spec.start_block.unwrap_or(0),
};
let chain_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
let (end_block, update_end_block) = if let Some(end_block) = predicate_spec.end_block {
if start_block > end_block {
return Err(
"Chainhook specification field `end_block` should be greater than `start_block`."
.into(),
);
}
// if the user provided an end block that is above the chain tip, we'll
// only scan up to the chain tip, then go to streaming mode
if end_block > chain_tip {
(chain_tip, true)
} else {
(end_block, false)
}
} else {
(chain_tip, true)
};

// we've already made this check with a user-provided end_block. But if the user didn't provide one, or if
// they provided one greater than chain tip, we could still have a start block that's greater then end block.
// but this time, it's not an error, we just want to start streaming.
if start_block > end_block {
info!(ctx.expect_logger(), "Chainhook specification field `start_block` is greater than Bitcoin chain tip for predicate {}. Switching to streaming mode.", predicate_spec.uuid);
return Ok(false);
let mut chain_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};

floating_end_block = update_end_block;
match BlockHeights::BlockRange(start_block, end_block).get_sorted_entries() {
Ok(heights) => heights,
Err(e) => match e {
BlockHeightsError::ExceedsMaxEntries(max, specified) => {
return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified));
}
BlockHeightsError::StartLargerThanEnd => {
return Err(
"Chainhook specification field `end_block` should be greater than `start_block`."
.into(),
);
}
},
}
let block_heights_to_scan = get_block_heights_to_scan(
&predicate_spec.blocks,
&predicate_spec.start_block,
&predicate_spec.end_block,
&chain_tip,
&unfinished_scan_data,
)?;
let mut block_heights_to_scan = match block_heights_to_scan {
Some(h) => h,
// no blocks to scan, go straight to streaming
None => return Ok(false),
};

let mut predicates_db_conn = match config.http_api {
Expand Down Expand Up @@ -142,6 +98,30 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
if current_block_height > chain_tip {
let prev_chain_tip = chain_tip;
// we've scanned up to the chain tip as of the start of this scan
// so see if the chain has progressed since then
chain_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
// if not, break out so we can enter streaming mode
// and put back the block we weren't able to scan
if current_block_height > chain_tip {
block_heights_to_scan.push_front(current_block_height);
break;
} else {
// if so, update our total number of blocks to scan
number_of_blocks_to_scan += chain_tip - prev_chain_tip;
}
}

number_of_blocks_scanned += 1;

let block_hash = retrieve_block_hash_with_retry(
Expand Down Expand Up @@ -216,30 +196,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
);
}
}

if block_heights_to_scan.is_empty() && floating_end_block {
let new_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
result.blocks
} else {
end_block
}
}
None => result.blocks,
},
Err(_e) => {
continue;
}
};

for entry in (current_block_height + 1)..new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}

info!(
ctx.expect_logger(),
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
Expand All @@ -255,10 +213,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicates_db_conn,
ctx,
);
// otherwise, we are finished scanning so we can expire the predicate if an end_block was provided
if let Some(predicate_end_block) = predicate_spec.end_block {
if predicate_end_block == last_block_scanned.index {
// todo: we need to find a way to check if this block is confirmed
// and if so, set the status to confirmed expiration
if block_heights_to_scan.is_empty() && predicate_end_block == last_block_scanned.index {
set_unconfirmed_expiration_status(
&Chain::Bitcoin,
number_of_blocks_scanned,
Expand Down
160 changes: 50 additions & 110 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::{HashMap, VecDeque};
use crate::{
archive::download_stacks_dataset_if_required,
config::{Config, PredicatesApi},
scan::common::get_block_heights_to_scan,
service::{
open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status,
set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData,
Expand All @@ -13,10 +14,11 @@ use crate::{
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
},
};
use chainhook_sdk::types::{BlockIdentifier, Chain};
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::{BlockHeights, Context},
utils::Context,
};
use chainhook_sdk::{
chainhooks::{
Expand All @@ -25,10 +27,6 @@ use chainhook_sdk::{
},
utils::{file_append, send_request, AbstractStacksBlock},
};
use chainhook_sdk::{
types::{BlockIdentifier, Chain},
utils::BlockHeightsError,
};
use rocksdb::DB;

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -171,75 +169,28 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
config: &Config,
ctx: &Context,
) -> Result<(Option<BlockIdentifier>, bool), String> {
let mut floating_end_block = false;

let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks {
match BlockHeights::Blocks(blocks.clone()).get_sorted_entries() {
Ok(heights) => heights,
Err(e) => match e {
BlockHeightsError::ExceedsMaxEntries(max, specified) => {
return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified));
}
BlockHeightsError::StartLargerThanEnd => unreachable!(),
},
}
} else {
let start_block = match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => predicate_spec.start_block.unwrap_or(0),
};
let chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => {
info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid);
return Ok((None, false));
}
},
};

let (end_block, update_end_block) = if let Some(end_block) = predicate_spec.end_block {
if start_block > end_block {
return Err(
"Chainhook specification field `end_block` should be greater than `start_block`."
.into(),
);
None => {
info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid);
return Ok((None, false));
}
// if the user provided an end block that is above the chain tip, we'll
// only scan up to the chain tip, then go to streaming mode
if end_block > chain_tip {
(chain_tip, true)
} else {
(end_block, false)
}
} else {
(chain_tip, true)
};

// we've already made this check with a user-provided end_block. But if the user didn't provide one, or if
// they provided one greater than chain tip, we could still have a start block that's greater then end block.
// but this time, it's not an error, we just want to start streaming.
if start_block > end_block {
info!(ctx.expect_logger(), "Chainhook specification field `start_block` is greater than Stacks chain tip for predicate {}. Switching to streaming mode.", predicate_spec.uuid);
return Ok((None, false));
}
},
};

floating_end_block = update_end_block;
match BlockHeights::BlockRange(start_block, end_block).get_sorted_entries() {
Ok(heights) => heights,
Err(e) => match e {
BlockHeightsError::ExceedsMaxEntries(max, specified) => {
return Err(format!("Chainhook specification exceeds max number of blocks to scan. Maximum: {}, Attempted: {}", max, specified));
}
BlockHeightsError::StartLargerThanEnd => {
return Err(
"Chainhook specification field `end_block` should be greater than `start_block`."
.into(),
);
}
},
}
let block_heights_to_scan = get_block_heights_to_scan(
&predicate_spec.blocks,
&predicate_spec.start_block,
&predicate_spec.end_block,
&chain_tip,
&unfinished_scan_data,
)?;
let mut block_heights_to_scan = match block_heights_to_scan {
Some(h) => h,
// no blocks to scan, go straight to streaming
None => return Ok((None, false)),
};

let mut predicates_db_conn = match config.http_api {
Expand Down Expand Up @@ -270,7 +221,34 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
};

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
if current_block_height > chain_tip {
let prev_chain_tip = chain_tip;
// we've scanned up to the chain tip as of the start of this scan
// so see if the chain has progressed since then

chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => {
info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid);
return Ok((None, false));
}
},
};
// if not, break out so we can enter streaming mode
// and put back the block we weren't able to scan
if current_block_height > chain_tip {
block_heights_to_scan.push_front(current_block_height);
break;
} else {
// if so, update our total number of blocks to scan
number_of_blocks_to_scan += chain_tip - prev_chain_tip;
}
}

number_of_blocks_scanned += 1;

let block_data =
match get_stacks_block_at_block_height(current_block_height, true, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Expand Down Expand Up @@ -362,44 +340,6 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
);
}
}

// Update end_block, in case a new block was discovered during the scan
if block_heights_to_scan.is_empty() && floating_end_block {
let new_tip = match predicate_spec.end_block {
Some(end_block) => {
match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
chain_tip
} else {
end_block
}
}
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
chain_tip
} else {
end_block
}
}
None => current_block_height,
},
}
}
None => match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(end_block) => end_block,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(end_block) => end_block,
None => current_block_height,
},
},
};
for entry in (current_block_height + 1)..=new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}
info!(
ctx.expect_logger(),
Expand All @@ -417,7 +357,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
ctx,
);
if let Some(predicate_end_block) = predicate_spec.end_block {
if predicate_end_block == last_block_scanned.index {
if block_heights_to_scan.is_empty() && predicate_end_block == last_block_scanned.index {
let is_confirmed = match get_stacks_block_at_block_height(
predicate_end_block,
true,
Expand Down

0 comments on commit 027b3b9

Please sign in to comment.