From 09ad97d7959d0246d58059d9f22fb705a27d5caa Mon Sep 17 00:00:00 2001 From: Rob N Date: Mon, 14 Oct 2024 09:33:09 -1000 Subject: [PATCH] feat: add event-based API to `Client` --- Cargo.toml | 12 ++- examples/events.rs | 48 ++++++++++++ src/lib.rs | 190 +++++++++++++++++++++++++++++++++++++++++---- src/logger.rs | 3 +- tests/client.rs | 3 + 5 files changed, 237 insertions(+), 19 deletions(-) create mode 100644 examples/events.rs diff --git a/Cargo.toml b/Cargo.toml index 1f7d661..9de5cbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,10 +22,12 @@ version = "1.0.0-beta.5" optional = true [features] -default = ["trace", "wallet", "rusqlite"] +default = ["wallet", "rusqlite", "events", "callbacks", "trace"] trace = ["tracing", "tracing-subscriber"] wallet = ["bdk_wallet"] rusqlite = ["kyoto-cbf/database"] +callbacks = [] +events = [] [dev-dependencies] tokio = { version = "1.37", features = ["full"], default-features = false } @@ -38,8 +40,12 @@ tracing-subscriber = { version = "0.3" } [[example]] name = "signet" -required-features = ["rusqlite"] +required-features = ["rusqlite", "callbacks"] [[example]] name = "wallet" -required-features = ["wallet", "trace", "rusqlite"] +required-features = ["wallet", "trace", "rusqlite", "callbacks"] + +[[example]] +name = "events" +required-features = ["wallet", "rusqlite", "events"] diff --git a/examples/events.rs b/examples/events.rs new file mode 100644 index 0000000..7cba657 --- /dev/null +++ b/examples/events.rs @@ -0,0 +1,48 @@ +use bdk_kyoto::builder::LightClientBuilder; +use bdk_kyoto::{Event, LogLevel}; +use bdk_wallet::bitcoin::Network; +use bdk_wallet::Wallet; + +/* Sync a bdk wallet using events*/ + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; + let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; + + let mut wallet = Wallet::create(desc, change_desc) + .network(Network::Signet) + .lookahead(30) + .create_wallet_no_persist()?; + + // The light client builder handles the logic of inserting the SPKs + let (node, mut client) = LightClientBuilder::new(&wallet) + .scan_after(170_000) + .build() + .unwrap(); + + tokio::task::spawn(async move { node.run().await }); + + loop { + if let Some(event) = client.next_event(LogLevel::Info).await { + match event { + Event::Log(log) => println!("INFO: {log}"), + Event::Warning(warning) => println!("WARNING: {warning}"), + Event::ScanResponse(full_scan_result) => { + wallet.apply_update(full_scan_result).unwrap(); + println!( + "INFO: Balance in BTC: {}", + wallet.balance().total().to_btc() + ); + } + Event::PeersFound => println!("INFO: Connected to all necessary peers."), + Event::TxSent(txid) => println!("INFO: Broadcast transaction: {txid}"), + Event::TxFailed(failure_payload) => { + println!("WARNING: Transaction failed to broadcast: {failure_payload:?}") + } + Event::StateChange(node_state) => println!("NEW TASK: {node_state}"), + Event::BlocksDisconnected(_) => {} + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index d77f50a..fc02c96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,39 @@ //! } //! ``` //! +//! It may be preferable to use events instead of defining a trait. To do so, +//! the workflow for building the node remains the same. +//! +//! ```no_run +//! # const RECEIVE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; +//! # const CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; +//! # use bdk_kyoto::builder::LightClientBuilder; +//! # use bdk_kyoto::{Event, LogLevel}; +//! # use bdk_wallet::bitcoin::Network; +//! # use bdk_wallet::Wallet; +//! #[tokio::main] +//! async fn main() -> anyhow::Result<()> { +//! let mut wallet = Wallet::create(RECEIVE, CHANGE) +//! .network(Network::Signet) +//! .create_wallet_no_persist()?; +//! +//! let (node, mut client) = LightClientBuilder::new(&wallet).build()?; +//! +//! tokio::task::spawn(async move { node.run().await }); +//! +//! loop { +//! if let Some(event) = client.next_event(LogLevel::Info).await { +//! match event { +//! Event::ScanResponse(full_scan_result) => { +//! wallet.apply_update(full_scan_result).unwrap(); +//! }, +//! _ => (), +//! } +//! } +//! } +//! } +//! ``` +//! //! Custom wallet implementations may still take advantage of BDK-Kyoto, however building the //! [`Client`] will involve configuring Kyoto directly. //! @@ -138,20 +171,23 @@ use bdk_chain::{ IndexedTxGraph, }; use bdk_chain::{ConfirmationBlockTime, TxUpdate}; -use kyoto::{IndexedBlock, SyncUpdate, TxBroadcast}; +use kyoto::{IndexedBlock, NodeMessage, SyncUpdate, TxBroadcast}; #[cfg(all(feature = "wallet", feature = "rusqlite"))] pub mod builder; +#[cfg(feature = "callbacks")] pub mod logger; pub use bdk_chain::local_chain::MissingGenesisError; #[cfg(feature = "rusqlite")] pub use kyoto::core::builder::NodeDefault; pub use kyoto::{ - ClientError, HeaderCheckpoint, Node, NodeBuilder, NodeMessage, NodeState, Receiver, ScriptBuf, - ServiceFlags, Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning, MAINNET_HEADER_CP, + ClientError, HeaderCheckpoint, Node, NodeBuilder, NodeState, Receiver, ScriptBuf, ServiceFlags, + Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning, MAINNET_HEADER_CP, SIGNET_HEADER_CP, }; +#[cfg(feature = "events")] +pub use kyoto::{DisconnectedHeader, FailurePayload}; /// A compact block filter client. #[derive(Debug)] @@ -192,6 +228,7 @@ where /// A reference to a [`NodeEventHandler`] is required, which handles events emitted from a /// running [`Node`]. Production applications should define how the application handles /// these events and displays them to end users. + #[cfg(feature = "callbacks")] pub async fn update(&mut self, logger: &dyn NodeEventHandler) -> Option> { let mut chain_changeset = BTreeMap::new(); while let Ok(message) = self.receiver.recv().await { @@ -230,18 +267,11 @@ where self.chain .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) .expect("chain was initialized with genesis"); - let tx_update = TxUpdate::from(self.graph.graph().clone()); - let graph = core::mem::take(&mut self.graph); - let last_active_indices = graph.index.last_used_indices(); - self.graph = IndexedTxGraph::new(graph.index); - Some(FullScanResult { - tx_update, - last_active_indices, - chain_update: Some(self.chain.tip()), - }) + Some(self.get_scan_response()) } // Send dialogs to an arbitrary logger + #[cfg(feature = "callbacks")] fn log(&self, message: &NodeMessage, logger: &dyn NodeEventHandler) { match message { NodeMessage::Dialog(d) => logger.dialog(d.clone()), @@ -261,7 +291,6 @@ where logger.blocks_disconnected(headers.iter().map(|dc| dc.height).collect()); } NodeMessage::TxSent(t) => { - // If this becomes a type in UniFFI then we can pass it to tx_sent logger.tx_sent(*t); } NodeMessage::TxBroadcastFailure(r) => logger.tx_failed(r.txid), @@ -270,6 +299,93 @@ where } } + // When the client is believed to have synced to the chain tip of most work, + // we can return a wallet update. + fn get_scan_response(&mut self) -> FullScanResult { + let tx_update = TxUpdate::from(self.graph.graph().clone()); + let graph = core::mem::take(&mut self.graph); + let last_active_indices = graph.index.last_used_indices(); + self.graph = IndexedTxGraph::new(graph.index); + FullScanResult { + tx_update, + last_active_indices, + chain_update: Some(self.chain.tip()), + } + } + + /// Wait for the next event from the client. If no event is ready, + /// `None` will be returned. Otherwise, the event will be `Some(..)`. + /// + /// Blocks will be processed while waiting for the next event of relevance. + /// When the node is fully synced to the chain of all connected peers, + /// an update for the provided keychain or underlying wallet will be returned. + /// + /// Informational messages on the node operation may be filtered out with + /// [`LogLevel::Warning`], which will only emit warnings when called. + #[cfg(feature = "events")] + pub async fn next_event(&mut self, log_level: LogLevel) -> Option> { + while let Ok(message) = self.receiver.recv().await { + match message { + NodeMessage::Dialog(log) => { + if matches!(log_level, LogLevel::Info) { + return Some(Event::Log(log)); + } + } + NodeMessage::Warning(warning) => return Some(Event::Warning(warning)), + NodeMessage::StateChange(node_state) => { + return Some(Event::StateChange(node_state)) + } + NodeMessage::ConnectionsMet => return Some(Event::PeersFound), + NodeMessage::Block(IndexedBlock { height, block }) => { + // This is weird but I'm having problems doing things differently. + let mut chain_changeset = BTreeMap::new(); + chain_changeset.insert(height, Some(block.block_hash())); + self.chain + .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) + .expect("chain initialized with genesis"); + let _ = self.graph.apply_block_relevant(&block, height); + return Some(Event::Log(format!( + "Applied block {} to keychain", + block.block_hash() + ))); + } + NodeMessage::Synced(SyncUpdate { + tip: _, + recent_history, + }) => { + let mut chain_changeset = BTreeMap::new(); + recent_history.into_iter().for_each(|(height, header)| { + chain_changeset.insert(height, Some(header.block_hash())); + }); + self.chain + .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) + .expect("chain was initialized with genesis"); + let result = self.get_scan_response(); + return Some(Event::ScanResponse(result)); + } + NodeMessage::BlocksDisconnected(headers) => { + let mut chain_changeset = BTreeMap::new(); + for dc in &headers { + let height = dc.height; + chain_changeset.insert(height, None); + } + self.chain + .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) + .expect("chain was initialized with genesis."); + return Some(Event::BlocksDisconnected(headers)); + } + NodeMessage::TxSent(txid) => { + return Some(Event::TxSent(txid)); + } + NodeMessage::TxBroadcastFailure(failure_payload) => { + return Some(Event::TxFailed(failure_payload)); + } + _ => continue, + } + } + None + } + /// Broadcast a [`Transaction`] with a [`TxBroadcastPolicy`] strategy. pub async fn broadcast( &self, @@ -306,8 +422,8 @@ where TransactionBroadcaster::new(self.client.sender()) } - /// Generate a new channel [`Receiver`] to get [`NodeMessage`] directly. Note that - /// [`Client::update`] will handle messages and build updates. + /// Generate a new channel [`Receiver`] to get [`NodeMessage`] directly, instead of using + /// the existing client APIs. /// /// ## Performance /// @@ -353,6 +469,7 @@ impl TransactionBroadcaster { /// or acting on the underlying wallet. Instead, this trait should be used to drive changes in user /// interface behavior or keep a simple log. Relevant events that effect on the wallet are handled /// automatically in [`Client::update`](Client). +#[cfg(feature = "callbacks")] pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static { /// Make use of some message the node has sent. fn dialog(&self, dialog: String); @@ -371,3 +488,46 @@ pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static { /// The node has synced to the height of the connected peers. fn synced(&self, tip: u32); } + +/// Events emitted by a node that may be used by a wallet or application. +#[cfg(feature = "events")] +pub enum Event { + /// Information about the current node process. + Log(String), + /// Warnings emitted by the node that may effect sync times or node operation. + Warning(Warning), + /// All required connnections have been met. + PeersFound, + /// A transaction was broadcast. + TxSent(Txid), + /// A transaction failed to broadcast or was rejected. + TxFailed(FailurePayload), + /// The node is performing a new task. + StateChange(NodeState), + /// A result after scanning compact block filters to the tip of the chain. + /// + /// ## Note + /// + /// This event will be emitted every time a new block is found while the node + /// is running and is connected to peers. + ScanResponse(FullScanResult), + /// Blocks were reorganized from the chain of most work. + /// + /// ## Note + /// + /// No action is required from the developer, as these events are already + /// handled within the [`Client`]. This event is to inform the user of + /// such an event. + BlocksDisconnected(Vec), +} + +/// Filter [`Event`] by a specified level. [`LogLevel::Info`] will pass +/// through both [`Event::Log`] and [`Event::Warning`]. [`LogLevel::Warning`] +/// will omit [`Event::Log`] events. +#[cfg(feature = "events")] +pub enum LogLevel { + /// Receive info messages and warnings. + Info, + /// Omit info messages and only receive warnings. + Warning, +} diff --git a/src/logger.rs b/src/logger.rs index 391b96e..56523bf 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -6,6 +6,7 @@ //! printing the display to the console. //! //! ```rust +//! #[cfg(feature = "callbacks")] //! use bdk_kyoto::logger::PrintLogger; //! use bdk_kyoto::Warning; //! use bdk_kyoto::NodeEventHandler; @@ -15,7 +16,7 @@ //! logger.warning(Warning::PeerTimedOut); //! ``` //! -//! For a more descriptive console log, the `tracing` feature may be used. +//! For a more descriptive console log, the `trace` feature may be used. //! //! ```rust //! use bdk_kyoto::logger::TraceLogger; diff --git a/tests/client.rs b/tests/client.rs index 6d8257d..d34f8a7 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -5,6 +5,7 @@ use tokio::task; use tokio::time; use bdk_kyoto::builder::LightClientBuilder; +#[cfg(feature = "callbacks")] use bdk_kyoto::logger::PrintLogger; use bdk_kyoto::NodeDefault; use bdk_kyoto::TrustedPeer; @@ -57,6 +58,7 @@ fn init_node( } #[tokio::test] +#[cfg(feature = "callbacks")] async fn update_returns_blockchain_data() -> anyhow::Result<()> { let env = testenv()?; @@ -118,6 +120,7 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { } #[tokio::test] +#[cfg(feature = "callbacks")] async fn update_handles_reorg() -> anyhow::Result<()> { let env = testenv()?;