Skip to content

Commit

Permalink
refactor(esplora): Simplify chain update logic
Browse files Browse the repository at this point in the history
I felt things didn't have to be so complicated. The chain could easily
be updated in one function rather than spread across two. The logic
needs at least one less loop.
  • Loading branch information
LLFourn committed Apr 2, 2024
1 parent f04207d commit 54c5189
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 137 deletions.
38 changes: 38 additions & 0 deletions crates/chain/src/local_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,44 @@ impl CheckPoint {
pub fn query_from(&self, height: u32) -> Option<Self> {
self.iter().take_while(|cp| cp.height() >= height).last()
}

/// Inserts `block_id` at its height within the chain.
///
/// The effect of `insert` depends on whether a height already exists. If it doesn't the
/// `block_id` we inserted and all pre-existing blocks higher than it will be re-inserted after
/// it. If the height already existed and has a conflicting block hash then it will be purged
/// along with all block followin it. The returned chain will have a tip of the `block_id`
/// passed in. Of course, if the `block_id` was already present then this just returns `self`.
#[must_use]
pub fn insert(self, block_id: BlockId) -> Self {
assert_ne!(block_id.height, 0, "cannot insert the genesis block");

let mut cp = self.clone();
let mut tail = vec![];
let base = loop {
if cp.height() == block_id.height {
if cp.hash() == block_id.hash {
return self;
} else {
// if we have a conflict we just return the inserted block because the tail is by
// implication invalid.
tail = vec![];
break cp.prev().expect("can't be called on genesis block");
}
} else if cp.height() < block_id.height {
break cp;
} else {
tail.push(cp.block_id());
}
cp = cp.prev().expect("will break before genesis block");
};

let tip = base
.extend(core::iter::once(block_id).chain(tail.into_iter().rev()))
.expect("tail is in order");

tip
}
}

/// Iterates over checkpoints backwards.
Expand Down
152 changes: 40 additions & 112 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::collections::BTreeSet;
use std::thread::JoinHandle;
use std::usize;

use bdk_chain::collections::btree_map;
use bdk_chain::collections::BTreeMap;
use bdk_chain::Anchor;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
bitcoin::{OutPoint, ScriptBuf, TxOut, Txid},
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
Expand Down Expand Up @@ -76,19 +75,13 @@ impl EsploraExt for esplora_client::BlockingClient {
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> {
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking(
self,
keychain_spks,
stop_gap,
parallel_requests,
)?;
let local_chain = finalize_chain_update_blocking(
self,
&local_tip,
tx_graph.all_anchors(),
update_blocks,
)?;
let local_chain = chain_update_blocking(self, &local_tip, tx_graph.all_anchors())?;
Ok(FullScanUpdate {
local_chain,
tx_graph,
Expand All @@ -104,137 +97,72 @@ impl EsploraExt for esplora_client::BlockingClient {
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
) -> Result<SyncUpdate, Error> {
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
let tx_graph = sync_for_index_and_graph_blocking(
self,
misc_spks,
txids,
outpoints,
parallel_requests,
)?;
let local_chain = finalize_chain_update_blocking(
self,
&local_tip,
tx_graph.all_anchors(),
update_blocks,
)?;
let local_chain = chain_update_blocking(self, &local_tip, tx_graph.all_anchors())?;
Ok(SyncUpdate {
local_chain,
tx_graph,
})
}
}

/// Create the initial chain update.
///
/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
/// update can connect to the `start_tip`.
///
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
/// alternating between chain-sources.
/// Updates the chain making sure to include heights for the anchors
#[doc(hidden)]
pub fn init_chain_update_blocking(
pub fn chain_update_blocking<A: Anchor>(
client: &esplora_client::BlockingClient,
local_tip: &CheckPoint,
) -> Result<BTreeMap<u32, BlockHash>, Error> {
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
// consistent.
let mut fetched_blocks = client
.get_blocks(None)?
.into_iter()
.map(|b| (b.time.height, b.id))
.collect::<BTreeMap<u32, BlockHash>>();
let new_tip_height = fetched_blocks
.keys()
.last()
.copied()
.expect("must atleast have one block");

// Ensure `fetched_blocks` can create an update that connects with the original chain by
// finding a "Point of Agreement".
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
if height > new_tip_height {
continue;
}

let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?),
};
anchors: &BTreeSet<(A, Txid)>,
) -> Result<local_chain::Update, Error> {
let mut point_of_agreement = None;
let mut conflicts = vec![];
for local_cp in local_tip.iter() {
let remote_hash = client.get_block_hash(local_cp.height())?;

// We have found point of agreement so the update will connect!
if fetched_hash == local_hash {
if remote_hash == local_cp.hash() {
point_of_agreement = Some(local_cp.clone());
break;
} else {
// it is not strictly necessary to include all the conflicted heights (we do need the
// first one) but it seems prudent to make sure the updated chain's heights are a
// superset of the existing chain after update.
conflicts.push(BlockId {
height: local_cp.height(),
hash: remote_hash,
});
}
}

Ok(fetched_blocks)
}

/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
///
/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
/// existing checkpoint/block under `local_tip` or `update_blocks`.
#[doc(hidden)]
pub fn finalize_chain_update_blocking<A: Anchor>(
client: &esplora_client::BlockingClient,
local_tip: &CheckPoint,
anchors: &BTreeSet<(A, Txid)>,
mut update_blocks: BTreeMap<u32, BlockHash>,
) -> Result<local_chain::Update, Error> {
let update_tip_height = update_blocks
.keys()
.last()
.copied()
.expect("must atleast have one block");
let mut tip = point_of_agreement.expect("remote esplora should have same genesis block");

// We want to have a corresponding checkpoint per height. We iterate the heights of anchors
// backwards, comparing it against our `local_tip`'s chain and our current set of
// `update_blocks` to see if a corresponding checkpoint already exists.
let anchor_heights = anchors
.iter()
.map(|(a, _)| a.anchor_block().height)
// filter out duplicate heights
.filter({
let mut prev_height = Option::<u32>::None;
move |h| match prev_height.replace(*h) {
None => true,
Some(prev_h) => prev_h != *h,
}
})
// filter out heights that surpass the update tip
.filter(|h| *h <= update_tip_height)
.rev();

// We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
// checkpoints more efficient.
let mut curr_cp = local_tip.clone();
tip = tip
.extend(conflicts.into_iter().rev())
.expect("evicted are in order");

for h in anchor_heights {
if let Some(cp) = curr_cp.query_from(h) {
curr_cp = cp.clone();
if cp.height() == h {
// blocks that already exist in checkpoint linked-list is also stored in
// `update_blocks` because we want to keep higher checkpoints of `local_chain`
update_blocks.insert(h, cp.hash());
continue;
}
}
if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
entry.insert(client.get_block_hash(h)?);
for anchor in anchors {
let height = anchor.0.anchor_block().height;
if tip.query(height).is_none() {
let hash = client.get_block_hash(height)?;
tip = tip.insert(BlockId { height, hash });
}
}

// insert the most recent blocks at the tip to make sure we update the tip and make the update
// robust.
for block in client.get_blocks(None)? {
tip = tip.insert(BlockId {
height: block.time.height,
hash: block.id,
});
}

Ok(local_chain::Update {
tip: CheckPoint::from_block_ids(
update_blocks
.into_iter()
.map(|(height, hash)| BlockId { height, hash }),
)
.expect("must be in order"),
tip,
introduce_older_blocks: true,
})
}
Expand Down
30 changes: 5 additions & 25 deletions crates/esplora/tests/blocking_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> {
let local_chain = {
let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?);
let chain_tip = chain.tip();
let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &chain_tip)?;
let update_anchors = t
.initial_cps
.iter()
Expand All @@ -103,12 +102,8 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> {
))
})
.collect::<anyhow::Result<BTreeSet<_>>>()?;
let chain_update = bdk_esplora::finalize_chain_update_blocking(
&client,
&chain_tip,
&update_anchors,
update_blocks,
)?;
let chain_update =
bdk_esplora::chain_update_blocking(&client, &chain_tip, &update_anchors)?;
chain.apply_update(chain_update)?;
chain
};
Expand All @@ -128,7 +123,6 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> {
// craft update
let update = {
let local_tip = local_chain.tip();
let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &local_tip)?;
let update_anchors = t
.anchors
.iter()
Expand All @@ -142,12 +136,7 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> {
))
})
.collect::<anyhow::Result<_>>()?;
bdk_esplora::finalize_chain_update_blocking(
&client,
&local_tip,
&update_anchors,
update_blocks,
)?
bdk_esplora::chain_update_blocking(&client, &local_tip, &update_anchors)?
};

// apply update
Expand Down Expand Up @@ -523,11 +512,6 @@ fn update_local_chain() -> anyhow::Result<()> {
let mut chain = t.chain;
let cp_tip = chain.tip();

let new_blocks =
bdk_esplora::init_chain_update_blocking(&client, &cp_tip).map_err(|err| {
anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err)
})?;

let mock_anchors = t
.request_heights
.iter()
Expand All @@ -546,12 +530,7 @@ fn update_local_chain() -> anyhow::Result<()> {
})
.collect::<BTreeSet<_>>();

let chain_update = bdk_esplora::finalize_chain_update_blocking(
&client,
&cp_tip,
&mock_anchors,
new_blocks,
)?;
let chain_update = bdk_esplora::chain_update_blocking(&client, &cp_tip, &mock_anchors)?;
let update_blocks = chain_update
.tip
.iter()
Expand All @@ -574,6 +553,7 @@ fn update_local_chain() -> anyhow::Result<()> {
)
.collect::<BTreeSet<_>>();

dbg!(&update_blocks, &exp_update_blocks);
assert!(
update_blocks.is_superset(&exp_update_blocks),
"[{}:{}] unexpected update",
Expand Down

0 comments on commit 54c5189

Please sign in to comment.