Skip to content

Commit

Permalink
Add remote block import provider
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Dec 24, 2024
1 parent 0a60488 commit 503e5be
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 33 deletions.
2 changes: 2 additions & 0 deletions crates/subcoin-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hex = { workspace = true }
indicatif = { workspace = true }
jsonrpsee = { workspace = true }
pallet-bitcoin = { workspace = true }
reqwest = { version = "0.12.9", features = ["json"] }
sc-cli = { workspace = true }
sc-client-api = { workspace = true }
sc-consensus = { workspace = true }
Expand Down Expand Up @@ -55,6 +56,7 @@ subcoin-service = { workspace = true }
subcoin-utxo-snapshot = { workspace = true }
substrate-frame-rpc-system = { workspace = true }
substrate-prometheus-endpoint = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
txoutset = { workspace = true }

Expand Down
94 changes: 61 additions & 33 deletions crates/subcoin-node/src/commands/import_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::cli::subcoin_params::CommonParams;
use crate::rpc_client::BlockstreamClient;
use crate::utils::Yield;
use bitcoin_explorer::BitcoinDB;
use futures::FutureExt;
Expand All @@ -17,14 +18,17 @@ use subcoin_primitives::BackendExt;
use subcoin_runtime::interface::OpaqueBlock;
use subcoin_service::FullClient;

/// Import Bitcoin blocks from bitcoind database.
/// Import Bitcoin blocks into the node.
#[derive(clap::Parser, Debug, Clone)]
pub struct ImportBlocks {
/// Path to the bitcoind database.
///
/// This corresponds to the value of the `-data-dir` argument in the bitcoind program.
#[clap(index = 1, value_parser)]
pub data_dir: PathBuf,
///
/// The blocks will be fetched from remote using the blockstream API if this argument
/// is not specified. Note that using the remote source is only for testing purpose.
#[clap(long, value_parser)]
pub data_dir: Option<PathBuf>,

/// Number of blocks to import.
///
Expand Down Expand Up @@ -80,21 +84,19 @@ impl ImportBlocksCmd {
pub async fn run<'a>(
&self,
client: Arc<FullClient>,
data_dir: PathBuf,
maybe_data_dir: Option<PathBuf>,
import_config: ImportConfig,
spawn_handle: SpawnTaskHandle,
maybe_prometheus_config: Option<PrometheusConfig>,
) -> sc_cli::Result<()> {
let from = (client.info().best_number + 1) as usize;
let block_provider = BitcoinBlockProvider::new(maybe_data_dir)?;

let block_provider = BlockProvider::from_local(&data_dir)?;
let max = block_provider.block_count();
let max = block_provider.block_count().await;
let to = self.to.unwrap_or(max).min(max);

tracing::info!(
"Start to import blocks from #{from} to #{to} from bitcoind database: {}",
data_dir.display()
);
let from = (client.info().best_number + 1) as usize;

tracing::info!("Start to import blocks from #{from} to #{to}",);

const INTERVAL: Duration = Duration::from_secs(1);

Expand Down Expand Up @@ -128,7 +130,7 @@ impl ImportBlocksCmd {
}

for index in from..=to {
let block = block_provider.block_at(index)?;
let block = block_provider.block_at(index).await?;
bitcoin_block_import
.import_block(block, BlockOrigin::Own)
.await
Expand All @@ -152,7 +154,11 @@ impl ImportBlocksCmd {
)
});

let speed = speed::<OpaqueBlock>(best_number, last_number, last_update);
let speed = calculate_import_speed::<OpaqueBlock>(
best_number,
last_number,
last_update,
);

tracing::info!(
"Imported {total_imported} blocks,{speed}, best#{best_number},{bitcoin_block_hash} ({substrate_block_hash})",
Expand Down Expand Up @@ -186,7 +192,7 @@ impl ImportBlocksCmd {

/// Calculates `(best_number - last_number) / (now - last_update)` and returns a `String`
/// representing the speed of import.
fn speed<B: BlockT>(
fn calculate_import_speed<B: BlockT>(
best_number: NumberFor<B>,
last_number: Option<NumberFor<B>>,
last_update: Instant,
Expand Down Expand Up @@ -243,37 +249,59 @@ impl sc_cli::CliConfiguration for ImportBlocksCmd {
}
}

enum BlockProvider {
enum BitcoinBlockProvider {
/// Local bitcoind database.
Local(BitcoinDB),
/// Remote source.
Remote(BlockstreamClient),
}

impl BlockProvider {
fn from_local(path: impl AsRef<Path>) -> sc_cli::Result<Self> {
let db = BitcoinDB::new(path.as_ref(), true)
.map_err(|err| sc_cli::Error::Application(Box::new(err)))?;
Ok(Self::Local(db))
impl BitcoinBlockProvider {
fn new(maybe_data_dir: Option<PathBuf>) -> sc_cli::Result<Self> {
match maybe_data_dir {
Some(data_dir) => {
tracing::info!("Using local bitcoind database: {}", data_dir.display());
let db = BitcoinDB::new(data_dir.as_ref(), true)
.map_err(|err| sc_cli::Error::Application(Box::new(err)))?;
Ok(Self::Local(db))
}
None => {
tracing::info!("Using remote block provider.");
Ok(Self::Remote(BlockstreamClient::new()))
}
}
}

fn block_at(&self, height: usize) -> sc_cli::Result<bitcoin::Block> {
async fn block_at(&self, height: usize) -> sc_cli::Result<bitcoin::Block> {
use bitcoin::consensus::Decodable;

let raw_block = match self {
Self::Local(db) => db.get_raw_block(height).map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to get bitcoin block at #{height}: {err}"),
)
})?,
};

Ok(bitcoin::Block::consensus_decode(&mut raw_block.as_slice())
.expect("Bad block in the database"))
match self {
Self::Local(db) => {
let raw_block = db.get_raw_block(height).map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to get bitcoin block at #{height}: {err}"),
)
})?;

Ok(bitcoin::Block::consensus_decode(&mut raw_block.as_slice())
.expect("Bad block in the database"))
}
Self::Remote(rpc_client) => rpc_client
.get_block_by_height(height as u32)
.await
.map_err(|err| sc_cli::Error::Application(Box::new(err))),
}
}

fn block_count(&self) -> usize {
async fn block_count(&self) -> usize {
match self {
Self::Local(db) => db.get_block_count(),
Self::Remote(rpc_client) => rpc_client
.get_tip_height()
.await
.expect("Failed to fetch tip height")
as usize,
}
}
}
1 change: 1 addition & 0 deletions crates/subcoin-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
mod cli;
mod commands;
mod rpc;
mod rpc_client;
mod substrate_cli;
mod utils;

Expand Down
88 changes: 88 additions & 0 deletions crates/subcoin-node/src/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use bitcoin::consensus::Decodable;
use bitcoin::{Block, BlockHash};
use reqwest::Client;
use thiserror::Error;

const BLOCKSTREAM_API_URL: &str = "https://blockstream.info/api";

#[derive(Debug, Error)]
pub enum ApiError {
#[error("Invalid block hash")]
InvalidBlockHash,
#[error("Invalid block number")]
InvalidBlockNumber,
#[error("HTTP request failed: {0}")]
HttpRequestError(#[from] reqwest::Error),
#[error(transparent)]
BitcoinIO(#[from] bitcoin::io::Error),
#[error(transparent)]
BitcoinEncode(#[from] bitcoin::consensus::encode::Error),
}

/// Client for interacting with the Blockstream API.
pub struct BlockstreamClient {
client: Client,
}

impl BlockstreamClient {
/// Create a new instance of [`BlockstreamClient`].
pub fn new() -> Self {
Self {
client: Client::new(),
}
}

/// Fetch the height of the latest block.
pub async fn get_tip_height(&self) -> Result<u32, ApiError> {
let url = format!("{BLOCKSTREAM_API_URL}/blocks/tip/height");
let response = self.client.get(&url).send().await?;
let height = response.text().await?;
height.parse().map_err(|_| ApiError::InvalidBlockNumber)
}

/// Fetch the hash of the latest block (tip hash).
pub async fn get_tip_hash(&self) -> Result<BlockHash, ApiError> {
let url = format!("{BLOCKSTREAM_API_URL}/blocks/tip/hash");
let response = self.client.get(&url).send().await?;
let hash = response.text().await?;
hash.parse().map_err(|_| ApiError::InvalidBlockHash)
}

/// Fetch hash of the block specified by the height.
pub async fn get_block_hash(&self, height: u32) -> Result<BlockHash, ApiError> {
let url = format!("{BLOCKSTREAM_API_URL}/block-height/{height}");
let hash = self.client.get(&url).send().await?.text().await?;
hash.parse().map_err(|_| ApiError::InvalidBlockHash)
}

/// Fetch block by its height.
pub async fn get_block_by_height(&self, height: u32) -> Result<Block, ApiError> {
let hash = self.get_block_hash(height).await?;
self.get_block(hash).await
}

/// Fetch block by its hash.
pub async fn get_block(&self, hash: BlockHash) -> Result<Block, ApiError> {
let url = format!("{BLOCKSTREAM_API_URL}/block/{hash}/raw");
let raw_block = self.client.get(&url).send().await?.bytes().await?;
let block = bitcoin::Block::consensus_decode(&mut raw_block.as_ref())?;
Ok(block)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_blockstream_client() {
let client = BlockstreamClient::new();
assert_eq!(
client.get_block_hash(100).await.unwrap(),
"000000007bc154e0fa7ea32218a72fe2c1bb9f86cf8c9ebf9a715ed27fdb229a"
.parse()
.unwrap()
);
}
}

0 comments on commit 503e5be

Please sign in to comment.