Skip to content

Commit

Permalink
WIP: event API
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Oct 15, 2024
1 parent 5b434c9 commit 8b91c6e
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 7 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ version = "1.0.0-beta.5"
optional = true

[features]
default = ["trace", "wallet", "rusqlite"]
default = ["wallet", "rusqlite", "events", "callbacks", "trace"]
trace = ["tracing", "tracing-subscriber"]
wallet = ["bdk_wallet"]
rusqlite = ["kyoto-cbf/database"]
callbacks = []
events = []

[dev-dependencies]
tokio = { version = "1.37", features = ["full"], default-features = false }
Expand All @@ -38,8 +40,8 @@ tracing-subscriber = { version = "0.3" }

[[example]]
name = "signet"
required-features = ["rusqlite"]
required-features = ["rusqlite", "callbacks"]

[[example]]
name = "wallet"
required-features = ["wallet", "trace", "rusqlite"]
required-features = ["wallet", "trace", "rusqlite", "callbacks"]
47 changes: 47 additions & 0 deletions examples/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use bdk_kyoto::builder::LightClientBuilder;
use bdk_kyoto::{Event, LogLevel};
use bdk_wallet::bitcoin::Network;
use bdk_wallet::Wallet;

/* Sync a bdk wallet using events*/

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";

let mut wallet = Wallet::create(desc, change_desc)
.network(Network::Signet)
.lookahead(30)
.create_wallet_no_persist()?;

// The light client builder handles the logic of inserting the SPKs
let (node, mut client) = LightClientBuilder::new(&wallet)
.scan_after(170_000)
.build()
.unwrap();

tokio::task::spawn(async move { node.run().await });

loop {
if let Some(event) = client.next_event(LogLevel::Info).await {
match event {
Event::Log(log) => println!("INFO: {log}"),
Event::Warning(warning) => println!("WARNING: {warning}"),
Event::ScanResult(full_scan_result) => {
wallet.apply_update(full_scan_result).unwrap();
println!(
"INFO: Balance in BTC: {}",
wallet.balance().total().to_sat()
);
}
Event::PeersFound => println!("INFO: Connected to all necessary peers."),
Event::TxSent(txid) => println!("INFO: Broadcast transaction: {txid}"),
Event::TxFailed(failure_payload) => {
println!("WARNING: Transaction failed to broadcast: {failure_payload:?}")
}
Event::StateChange(node_state) => println!("NEW TASK: {node_state}"),
}
}
}
}
116 changes: 113 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,16 @@ use kyoto::{IndexedBlock, SyncUpdate, TxBroadcast};

#[cfg(all(feature = "wallet", feature = "rusqlite"))]
pub mod builder;
#[cfg(feature = "callbacks")]
pub mod logger;

pub use bdk_chain::local_chain::MissingGenesisError;
#[cfg(feature = "rusqlite")]
pub use kyoto::core::builder::NodeDefault;
pub use kyoto::{
ClientError, HeaderCheckpoint, Node, NodeBuilder, NodeMessage, NodeState, Receiver, ScriptBuf,
ServiceFlags, Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning, MAINNET_HEADER_CP,
SIGNET_HEADER_CP,
ClientError, FailurePayload, HeaderCheckpoint, Node, NodeBuilder, NodeMessage, NodeState,
Receiver, ScriptBuf, ServiceFlags, Transaction, TrustedPeer, TxBroadcastPolicy, Txid, Warning,
MAINNET_HEADER_CP, SIGNET_HEADER_CP,
};

/// A compact block filter client.
Expand Down Expand Up @@ -192,6 +193,7 @@ where
/// A reference to a [`NodeEventHandler`] is required, which handles events emitted from a
/// running [`Node`]. Production applications should define how the application handles
/// these events and displays them to end users.
#[cfg(feature = "callbacks")]
pub async fn update(&mut self, logger: &dyn NodeEventHandler) -> Option<FullScanResult<K>> {
let mut chain_changeset = BTreeMap::new();
while let Ok(message) = self.receiver.recv().await {
Expand Down Expand Up @@ -242,6 +244,7 @@ where
}

// Send dialogs to an arbitrary logger
#[cfg(feature = "callbacks")]
fn log(&self, message: &NodeMessage, logger: &dyn NodeEventHandler) {
match message {
NodeMessage::Dialog(d) => logger.dialog(d.clone()),
Expand Down Expand Up @@ -284,6 +287,82 @@ where
.await
}

/// Wait for the next event from the client. If no event is ready,
/// `None` will be returned. Otherwise, the event will be `Some(..)`.
///
/// Blocks will be processed while waiting for the next event of relevance.
/// When the node is fully synced to the chain of all connected peers,
/// an update for the provided keychain or underlying wallet will be returned.
///
/// Informational messages on the node operation may be filtered out with
/// [`LogLevel::Warning`], which will only emit warnings when called.
#[cfg(feature = "events")]
pub async fn next_event(&mut self, log_level: LogLevel) -> Option<Event<K>> {
while let Ok(message) = self.receiver.recv().await {
match message {
NodeMessage::Dialog(log) => {
if matches!(log_level, LogLevel::Info) {
return Some(Event::Log(log));
}
}
NodeMessage::Warning(warning) => return Some(Event::Warning(warning)),
NodeMessage::StateChange(node_state) => {
return Some(Event::StateChange(node_state))
}
NodeMessage::ConnectionsMet => return Some(Event::PeersFound),
NodeMessage::Block(IndexedBlock { height, block }) => {
// This is weird but I'm having problems doing things differently.
let mut chain_changeset = BTreeMap::new();
chain_changeset.insert(height, Some(block.block_hash()));
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain initialized with genesis");
let _ = self.graph.apply_block_relevant(&block, height);
}
NodeMessage::Synced(SyncUpdate {
tip: _,
recent_history,
}) => {
let mut chain_changeset = BTreeMap::new();
recent_history.into_iter().for_each(|(height, header)| {
chain_changeset.insert(height, Some(header.block_hash()));
});
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
let tx_update = TxUpdate::from(self.graph.graph().clone());
let graph = core::mem::take(&mut self.graph);
let last_active_indices = graph.index.last_used_indices();
self.graph = IndexedTxGraph::new(graph.index);
let result = FullScanResult {
tx_update,
last_active_indices,
chain_update: Some(self.chain.tip()),
};
return Some(Event::ScanResult(result));
}
NodeMessage::BlocksDisconnected(headers) => {
let mut chain_changeset = BTreeMap::new();
for dc in headers {
let height = dc.height;
chain_changeset.insert(height, None);
}
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis.");
}
NodeMessage::TxSent(txid) => {
return Some(Event::TxSent(txid));
}
NodeMessage::TxBroadcastFailure(failure_payload) => {
return Some(Event::TxFailed(failure_payload));
}
_ => continue,
}
}
None
}

/// Add more scripts to the node. For example, a user may reveal a Bitcoin address to receive a
/// payment, so this script should be added to the [`Node`].
///
Expand Down Expand Up @@ -353,6 +432,7 @@ impl TransactionBroadcaster {
/// or acting on the underlying wallet. Instead, this trait should be used to drive changes in user
/// interface behavior or keep a simple log. Relevant events that effect on the wallet are handled
/// automatically in [`Client::update`](Client).
#[cfg(feature = "callbacks")]
pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static {
/// Make use of some message the node has sent.
fn dialog(&self, dialog: String);
Expand All @@ -371,3 +451,33 @@ pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static {
/// The node has synced to the height of the connected peers.
fn synced(&self, tip: u32);
}

/// Events emitted by a node that may be used by a wallet or application.
#[cfg(feature = "events")]
pub enum Event<K: fmt::Debug + Clone + Ord> {
/// Information about the current node process.
Log(String),
/// Warnings emitted by the node that may effect sync times or node operation.
Warning(Warning),
/// All required connnections have been met.
PeersFound,
/// A transaction was broadcast.
TxSent(Txid),
/// A transaction failed to broadcast or was rejected.
TxFailed(FailurePayload),
/// The node is performing a new task.
StateChange(NodeState),
/// A result after scanning compact block filters to the tip of the chain.
ScanResult(FullScanResult<K>),
}

/// Filter [`Event`] by a specified level. [`LogLevel::Debug`] will pass
/// through both [`Event::Log`] and [`Event::Warning`]. [`LogLevel::Warning`]
/// will omit [`Event::Log`] events.
#[cfg(feature = "events")]
pub enum LogLevel {
/// Receive info messages and warnings.
Info,
/// Omit info messages and only receive warnings.
Warning,
}
3 changes: 2 additions & 1 deletion src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! printing the display to the console.
//!
//! ```rust
//! #[cfg(feature = "callbacks")]
//! use bdk_kyoto::logger::PrintLogger;
//! use bdk_kyoto::Warning;
//! use bdk_kyoto::NodeEventHandler;
Expand All @@ -15,7 +16,7 @@
//! logger.warning(Warning::PeerTimedOut);
//! ```
//!
//! For a more descriptive console log, the `tracing` feature may be used.
//! For a more descriptive console log, the `trace` feature may be used.
//!
//! ```rust
//! use bdk_kyoto::logger::TraceLogger;
Expand Down
3 changes: 3 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::task;
use tokio::time;

use bdk_kyoto::builder::LightClientBuilder;
#[cfg(feature = "callbacks")]
use bdk_kyoto::logger::PrintLogger;
use bdk_kyoto::NodeDefault;
use bdk_kyoto::TrustedPeer;
Expand Down Expand Up @@ -57,6 +58,7 @@ fn init_node(
}

#[tokio::test]
#[cfg(feature = "callbacks")]
async fn update_returns_blockchain_data() -> anyhow::Result<()> {
let env = testenv()?;

Expand Down Expand Up @@ -118,6 +120,7 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> {
}

#[tokio::test]
#[cfg(feature = "callbacks")]
async fn update_handles_reorg() -> anyhow::Result<()> {
let env = testenv()?;

Expand Down

0 comments on commit 8b91c6e

Please sign in to comment.