Skip to content

Commit

Permalink
feat: add event-based API to Client
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Oct 21, 2024
1 parent 5b434c9 commit 09ad97d
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 19 deletions.
12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ version = "1.0.0-beta.5"
optional = true

[features]
default = ["trace", "wallet", "rusqlite"]
default = ["wallet", "rusqlite", "events", "callbacks", "trace"]
trace = ["tracing", "tracing-subscriber"]
wallet = ["bdk_wallet"]
rusqlite = ["kyoto-cbf/database"]
callbacks = []
events = []

[dev-dependencies]
tokio = { version = "1.37", features = ["full"], default-features = false }
Expand All @@ -38,8 +40,12 @@ tracing-subscriber = { version = "0.3" }

[[example]]
name = "signet"
required-features = ["rusqlite"]
required-features = ["rusqlite", "callbacks"]

[[example]]
name = "wallet"
required-features = ["wallet", "trace", "rusqlite"]
required-features = ["wallet", "trace", "rusqlite", "callbacks"]

[[example]]
name = "events"
required-features = ["wallet", "rusqlite", "events"]
48 changes: 48 additions & 0 deletions examples/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use bdk_kyoto::builder::LightClientBuilder;
use bdk_kyoto::{Event, LogLevel};
use bdk_wallet::bitcoin::Network;
use bdk_wallet::Wallet;

/* Sync a bdk wallet using events*/

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";

let mut wallet = Wallet::create(desc, change_desc)
.network(Network::Signet)
.lookahead(30)
.create_wallet_no_persist()?;

// The light client builder handles the logic of inserting the SPKs
let (node, mut client) = LightClientBuilder::new(&wallet)
.scan_after(170_000)
.build()
.unwrap();

tokio::task::spawn(async move { node.run().await });

loop {
if let Some(event) = client.next_event(LogLevel::Info).await {
match event {
Event::Log(log) => println!("INFO: {log}"),
Event::Warning(warning) => println!("WARNING: {warning}"),
Event::ScanResponse(full_scan_result) => {
wallet.apply_update(full_scan_result).unwrap();
println!(
"INFO: Balance in BTC: {}",
wallet.balance().total().to_btc()
);
}
Event::PeersFound => println!("INFO: Connected to all necessary peers."),
Event::TxSent(txid) => println!("INFO: Broadcast transaction: {txid}"),
Event::TxFailed(failure_payload) => {
println!("WARNING: Transaction failed to broadcast: {failure_payload:?}")
}
Event::StateChange(node_state) => println!("NEW TASK: {node_state}"),
Event::BlocksDisconnected(_) => {}
}
}
}
}
190 changes: 175 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,39 @@
//! }
//! ```
//!
//! It may be preferable to use events instead of defining a trait. To do so,
//! the workflow for building the node remains the same.
//!
//! ```no_run
//! # const RECEIVE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
//! # const CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";
//! # use bdk_kyoto::builder::LightClientBuilder;
//! # use bdk_kyoto::{Event, LogLevel};
//! # use bdk_wallet::bitcoin::Network;
//! # use bdk_wallet::Wallet;
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! let mut wallet = Wallet::create(RECEIVE, CHANGE)
//! .network(Network::Signet)
//! .create_wallet_no_persist()?;
//!
//! let (node, mut client) = LightClientBuilder::new(&wallet).build()?;
//!
//! tokio::task::spawn(async move { node.run().await });
//!
//! loop {
//! if let Some(event) = client.next_event(LogLevel::Info).await {
//! match event {
//! Event::ScanResponse(full_scan_result) => {
//! wallet.apply_update(full_scan_result).unwrap();
//! },
//! _ => (),
//! }
//! }
//! }
//! }
//! ```
//!
//! Custom wallet implementations may still take advantage of BDK-Kyoto, however building the
//! [`Client`] will involve configuring Kyoto directly.
//!
Expand Down Expand Up @@ -138,20 +171,23 @@ use bdk_chain::{
IndexedTxGraph,
};
use bdk_chain::{ConfirmationBlockTime, TxUpdate};
use kyoto::{IndexedBlock, SyncUpdate, TxBroadcast};
use kyoto::{IndexedBlock, NodeMessage, SyncUpdate, TxBroadcast};

#[cfg(all(feature = "wallet", feature = "rusqlite"))]
pub mod builder;
#[cfg(feature = "callbacks")]
pub mod logger;

pub use bdk_chain::local_chain::MissingGenesisError;
#[cfg(feature = "rusqlite")]
pub use kyoto::core::builder::NodeDefault;
pub use kyoto::{
ClientError, HeaderCheckpoint, Node, NodeBuilder, NodeMessage, NodeState, Receiver, ScriptBuf,
ServiceFlags, Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning, MAINNET_HEADER_CP,
ClientError, HeaderCheckpoint, Node, NodeBuilder, NodeState, Receiver, ScriptBuf, ServiceFlags,
Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning, MAINNET_HEADER_CP,
SIGNET_HEADER_CP,
};
#[cfg(feature = "events")]
pub use kyoto::{DisconnectedHeader, FailurePayload};

/// A compact block filter client.
#[derive(Debug)]
Expand Down Expand Up @@ -192,6 +228,7 @@ where
/// A reference to a [`NodeEventHandler`] is required, which handles events emitted from a
/// running [`Node`]. Production applications should define how the application handles
/// these events and displays them to end users.
#[cfg(feature = "callbacks")]
pub async fn update(&mut self, logger: &dyn NodeEventHandler) -> Option<FullScanResult<K>> {
let mut chain_changeset = BTreeMap::new();
while let Ok(message) = self.receiver.recv().await {
Expand Down Expand Up @@ -230,18 +267,11 @@ where
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
let tx_update = TxUpdate::from(self.graph.graph().clone());
let graph = core::mem::take(&mut self.graph);
let last_active_indices = graph.index.last_used_indices();
self.graph = IndexedTxGraph::new(graph.index);
Some(FullScanResult {
tx_update,
last_active_indices,
chain_update: Some(self.chain.tip()),
})
Some(self.get_scan_response())
}

// Send dialogs to an arbitrary logger
#[cfg(feature = "callbacks")]
fn log(&self, message: &NodeMessage, logger: &dyn NodeEventHandler) {
match message {
NodeMessage::Dialog(d) => logger.dialog(d.clone()),
Expand All @@ -261,7 +291,6 @@ where
logger.blocks_disconnected(headers.iter().map(|dc| dc.height).collect());
}
NodeMessage::TxSent(t) => {
// If this becomes a type in UniFFI then we can pass it to tx_sent
logger.tx_sent(*t);
}
NodeMessage::TxBroadcastFailure(r) => logger.tx_failed(r.txid),
Expand All @@ -270,6 +299,93 @@ where
}
}

// When the client is believed to have synced to the chain tip of most work,
// we can return a wallet update.
fn get_scan_response(&mut self) -> FullScanResult<K> {
let tx_update = TxUpdate::from(self.graph.graph().clone());
let graph = core::mem::take(&mut self.graph);
let last_active_indices = graph.index.last_used_indices();
self.graph = IndexedTxGraph::new(graph.index);
FullScanResult {
tx_update,
last_active_indices,
chain_update: Some(self.chain.tip()),
}
}

/// Wait for the next event from the client. If no event is ready,
/// `None` will be returned. Otherwise, the event will be `Some(..)`.
///
/// Blocks will be processed while waiting for the next event of relevance.
/// When the node is fully synced to the chain of all connected peers,
/// an update for the provided keychain or underlying wallet will be returned.
///
/// Informational messages on the node operation may be filtered out with
/// [`LogLevel::Warning`], which will only emit warnings when called.
#[cfg(feature = "events")]
pub async fn next_event(&mut self, log_level: LogLevel) -> Option<Event<K>> {
while let Ok(message) = self.receiver.recv().await {
match message {
NodeMessage::Dialog(log) => {
if matches!(log_level, LogLevel::Info) {
return Some(Event::Log(log));
}
}
NodeMessage::Warning(warning) => return Some(Event::Warning(warning)),
NodeMessage::StateChange(node_state) => {
return Some(Event::StateChange(node_state))
}
NodeMessage::ConnectionsMet => return Some(Event::PeersFound),
NodeMessage::Block(IndexedBlock { height, block }) => {
// This is weird but I'm having problems doing things differently.
let mut chain_changeset = BTreeMap::new();
chain_changeset.insert(height, Some(block.block_hash()));
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain initialized with genesis");
let _ = self.graph.apply_block_relevant(&block, height);
return Some(Event::Log(format!(
"Applied block {} to keychain",
block.block_hash()
)));
}
NodeMessage::Synced(SyncUpdate {
tip: _,
recent_history,
}) => {
let mut chain_changeset = BTreeMap::new();
recent_history.into_iter().for_each(|(height, header)| {
chain_changeset.insert(height, Some(header.block_hash()));
});
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
let result = self.get_scan_response();
return Some(Event::ScanResponse(result));
}
NodeMessage::BlocksDisconnected(headers) => {
let mut chain_changeset = BTreeMap::new();
for dc in &headers {
let height = dc.height;
chain_changeset.insert(height, None);
}
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis.");
return Some(Event::BlocksDisconnected(headers));
}
NodeMessage::TxSent(txid) => {
return Some(Event::TxSent(txid));
}
NodeMessage::TxBroadcastFailure(failure_payload) => {
return Some(Event::TxFailed(failure_payload));
}
_ => continue,
}
}
None
}

/// Broadcast a [`Transaction`] with a [`TxBroadcastPolicy`] strategy.
pub async fn broadcast(
&self,
Expand Down Expand Up @@ -306,8 +422,8 @@ where
TransactionBroadcaster::new(self.client.sender())
}

/// Generate a new channel [`Receiver`] to get [`NodeMessage`] directly. Note that
/// [`Client::update`] will handle messages and build updates.
/// Generate a new channel [`Receiver`] to get [`NodeMessage`] directly, instead of using
/// the existing client APIs.
///
/// ## Performance
///
Expand Down Expand Up @@ -353,6 +469,7 @@ impl TransactionBroadcaster {
/// or acting on the underlying wallet. Instead, this trait should be used to drive changes in user
/// interface behavior or keep a simple log. Relevant events that effect on the wallet are handled
/// automatically in [`Client::update`](Client).
#[cfg(feature = "callbacks")]
pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static {
/// Make use of some message the node has sent.
fn dialog(&self, dialog: String);
Expand All @@ -371,3 +488,46 @@ pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static {
/// The node has synced to the height of the connected peers.
fn synced(&self, tip: u32);
}

/// Events emitted by a node that may be used by a wallet or application.
#[cfg(feature = "events")]
pub enum Event<K: fmt::Debug + Clone + Ord> {
/// Information about the current node process.
Log(String),
/// Warnings emitted by the node that may effect sync times or node operation.
Warning(Warning),
/// All required connnections have been met.
PeersFound,
/// A transaction was broadcast.
TxSent(Txid),
/// A transaction failed to broadcast or was rejected.
TxFailed(FailurePayload),
/// The node is performing a new task.
StateChange(NodeState),
/// A result after scanning compact block filters to the tip of the chain.
///
/// ## Note
///
/// This event will be emitted every time a new block is found while the node
/// is running and is connected to peers.
ScanResponse(FullScanResult<K>),
/// Blocks were reorganized from the chain of most work.
///
/// ## Note
///
/// No action is required from the developer, as these events are already
/// handled within the [`Client`]. This event is to inform the user of
/// such an event.
BlocksDisconnected(Vec<DisconnectedHeader>),
}

/// Filter [`Event`] by a specified level. [`LogLevel::Info`] will pass
/// through both [`Event::Log`] and [`Event::Warning`]. [`LogLevel::Warning`]
/// will omit [`Event::Log`] events.
#[cfg(feature = "events")]
pub enum LogLevel {
/// Receive info messages and warnings.
Info,
/// Omit info messages and only receive warnings.
Warning,
}
3 changes: 2 additions & 1 deletion src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! printing the display to the console.
//!
//! ```rust
//! #[cfg(feature = "callbacks")]
//! use bdk_kyoto::logger::PrintLogger;
//! use bdk_kyoto::Warning;
//! use bdk_kyoto::NodeEventHandler;
Expand All @@ -15,7 +16,7 @@
//! logger.warning(Warning::PeerTimedOut);
//! ```
//!
//! For a more descriptive console log, the `tracing` feature may be used.
//! For a more descriptive console log, the `trace` feature may be used.
//!
//! ```rust
//! use bdk_kyoto::logger::TraceLogger;
Expand Down
3 changes: 3 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::task;
use tokio::time;

use bdk_kyoto::builder::LightClientBuilder;
#[cfg(feature = "callbacks")]
use bdk_kyoto::logger::PrintLogger;
use bdk_kyoto::NodeDefault;
use bdk_kyoto::TrustedPeer;
Expand Down Expand Up @@ -57,6 +58,7 @@ fn init_node(
}

#[tokio::test]
#[cfg(feature = "callbacks")]
async fn update_returns_blockchain_data() -> anyhow::Result<()> {
let env = testenv()?;

Expand Down Expand Up @@ -118,6 +120,7 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> {
}

#[tokio::test]
#[cfg(feature = "callbacks")]
async fn update_handles_reorg() -> anyhow::Result<()> {
let env = testenv()?;

Expand Down

0 comments on commit 09ad97d

Please sign in to comment.