Skip to content

Commit

Permalink
WIP: event API
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Oct 14, 2024
1 parent 5b434c9 commit 70c6956
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 1 deletion.
44 changes: 44 additions & 0 deletions examples/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use bdk_kyoto::builder::LightClientBuilder;
use bdk_kyoto::{Event, LogLevel};
use bdk_wallet::bitcoin::Network;
use bdk_wallet::Wallet;

/* Sync a bdk wallet using events*/

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)";
let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)";

let mut wallet = Wallet::create(desc, change_desc)
.network(Network::Signet)
.lookahead(30)
.create_wallet_no_persist()?;

// The light client builder handles the logic of inserting the SPKs
let (node, mut client) = LightClientBuilder::new(&wallet)
.scan_after(170_000)
.build()
.unwrap();

tokio::task::spawn(async move { node.run().await });

loop {
if let Some(event) = client.next_event(LogLevel::Debug).await {
match event {
Event::Log(log) => println!("INFO: {log}"),
Event::Warning(warning) => println!("WARNING: {warning}"),
Event::ScanResult(full_scan_result) => {
wallet.apply_update(full_scan_result).unwrap();
println!("Balance in BTC: {}", wallet.balance().total().to_sat());
}
Event::PeersFound => println!("INFO: Connected to all necessary peers."),
Event::TxSent(txid) => println!("INFO: Broadcast transaction: {txid}"),
Event::TxFailed(failure_payload) => {
println!("WARNING: Transaction failed to broadcast: {failure_payload:?}")
}
Event::StateChange(node_state) => println!("NEW TASK: {node_state}"),
}
}
}
}
105 changes: 104 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ use bdk_chain::{
IndexedTxGraph,
};
use bdk_chain::{ConfirmationBlockTime, TxUpdate};
use kyoto::{IndexedBlock, SyncUpdate, TxBroadcast};
use kyoto::{FailurePayload, IndexedBlock, SyncUpdate, TxBroadcast};

#[cfg(all(feature = "wallet", feature = "rusqlite"))]
pub mod builder;
Expand Down Expand Up @@ -284,6 +284,81 @@ where
.await
}

/// Wait for the next event from the client. If no event is ready,
/// `None` will be returned. Otherwise, the event will be `Some(..)`.
///
/// Blocks will be processed while waiting for the next event of relevance.
/// When the node is fully synced to the chain of all connected peers,
/// an update for the provided keychain or underlying wallet will be returned.
///
/// Informational messages on the node operation may be filtered out with
/// [`LogLevel::Warning`], which will only emit warnings when called.
pub async fn next_event(&mut self, log_level: LogLevel) -> Option<Event<K>> {
while let Ok(message) = self.receiver.recv().await {
match message {
NodeMessage::Dialog(log) => {
if matches!(log_level, LogLevel::Debug) {
return Some(Event::Log(log));
}
}
NodeMessage::Warning(warning) => return Some(Event::Warning(warning)),
NodeMessage::StateChange(node_state) => {
return Some(Event::StateChange(node_state))
}
NodeMessage::ConnectionsMet => return Some(Event::PeersFound),
NodeMessage::Block(IndexedBlock { height, block }) => {
// This is weird but I'm having problems doing things differently.
let mut chain_changeset = BTreeMap::new();
chain_changeset.insert(height, Some(block.block_hash()));
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain initialized with genesis");
let _ = self.graph.apply_block_relevant(&block, height);
}
NodeMessage::Synced(SyncUpdate {
tip: _,
recent_history,
}) => {
let mut chain_changeset = BTreeMap::new();
recent_history.into_iter().for_each(|(height, header)| {
chain_changeset.insert(height, Some(header.block_hash()));
});
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
let tx_update = TxUpdate::from(self.graph.graph().clone());
let graph = core::mem::take(&mut self.graph);
let last_active_indices = graph.index.last_used_indices();
self.graph = IndexedTxGraph::new(graph.index);
let result = FullScanResult {
tx_update,
last_active_indices,
chain_update: Some(self.chain.tip()),
};
return Some(Event::ScanResult(result));
}
NodeMessage::BlocksDisconnected(headers) => {
let mut chain_changeset = BTreeMap::new();
for dc in headers {
let height = dc.height;
chain_changeset.insert(height, None);
}
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis.");
}
NodeMessage::TxSent(txid) => {
return Some(Event::TxSent(txid));
}
NodeMessage::TxBroadcastFailure(failure_payload) => {
return Some(Event::TxFailed(failure_payload));
}
_ => continue,
}
}
None
}

/// Add more scripts to the node. For example, a user may reveal a Bitcoin address to receive a
/// payment, so this script should be added to the [`Node`].
///
Expand Down Expand Up @@ -371,3 +446,31 @@ pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static {
/// The node has synced to the height of the connected peers.
fn synced(&self, tip: u32);
}

/// Events emitted by a node that may be used by a wallet or application.
pub enum Event<K: fmt::Debug + Clone + Ord> {
/// Information about the current node process.
Log(String),
/// Warnings emitted by the node that may effect sync times or node operation.
Warning(Warning),
/// All required connnections have been met.
PeersFound,
/// A transaction was broadcast.
TxSent(Txid),
/// A transaction failed to broadcast or was rejected.
TxFailed(FailurePayload),
/// The node is performing a new task.
StateChange(NodeState),
/// A result after scanning compact block filters to the tip of the chain.
ScanResult(FullScanResult<K>),
}

/// Filter [`Event`] by a specified level. [`LogLevel::Debug`] will pass
/// through both [`Event::Log`] and [`Event::Warning`]. [`LogLevel::Warning`]
/// will omit [`Event::Log`] events.
pub enum LogLevel {
/// Receive info messages and warnings.
Debug,
/// Omit info messages and only receive warnings.
Warning,
}

0 comments on commit 70c6956

Please sign in to comment.