diff --git a/Cargo.lock b/Cargo.lock index 0ca2a72..a329006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,12 +352,6 @@ dependencies = [ "socks", ] -[[package]] -name = "port_check" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2110609fb863cdb367d4e69d6c43c81ba6a8c7d18e80082fe9f3ef16b23afeed" - [[package]] name = "proc-macro2" version = "1.0.83" @@ -369,7 +363,7 @@ dependencies = [ [[package]] name = "pushtx" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bitcoin", "crossbeam-channel", @@ -379,13 +373,12 @@ dependencies = [ "hex", "log", "peerlink", - "port_check", "sha3", ] [[package]] name = "pushtx-cli" -version = "0.3.0" +version = "0.4.0" dependencies = [ "anyhow", "clap", diff --git a/README.md b/README.md index ae99802..1214743 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,9 @@ also works. 1. Resolve peers through DNS seeds. 2. Detect if Tor is present. 3. Connect to 10 random peers, through Tor if possible. -4. Broadcast the transaction. -5. Disconnect. +4. Broadcast the transaction to a single peer. +5. Wait until the transaction is seen on the network. +6. Disconnect. ### Executable @@ -42,7 +43,7 @@ Install with Cargo: `cargo install pushtx-cli` loop { match receiver.recv().unwrap() { pushtx::Info::Done(Ok(report)) => { - println!("we successfully broadcast to {} peers", report.broadcasts); + println!("{} transactions broadcast successfully", report.success.len()); break; } pushtx::Info::Done(Err(err)) => { diff --git a/pushtx-cli/Cargo.toml b/pushtx-cli/Cargo.toml index 00146ec..5cad41c 100644 --- a/pushtx-cli/Cargo.toml +++ b/pushtx-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pushtx-cli" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["Alfred Hodler "] license = "MIT" @@ -18,5 +18,5 @@ anyhow = "1.0.86" clap = { version = "4.5.4", features = ["derive"] } env_logger = { version = "0.11.3", default-features = false } log = "0.4.20" -pushtx = { version = "0.3.0", path = "../pushtx" } +pushtx = { version = "0.4.0", path = "../pushtx" } thiserror = "1.0.61" diff --git a/pushtx-cli/README.md b/pushtx-cli/README.md index 1770006..00731f3 100644 --- a/pushtx-cli/README.md +++ b/pushtx-cli/README.md @@ -15,8 +15,9 @@ also works. 1. Resolve peers through DNS seeds. 2. Detect if Tor is present. 3. Connect to 10 random peers, through Tor if possible. -4. Broadcast the transaction. -5. Disconnect. +4. Broadcast the transaction to a single peer. +5. Wait until the transaction is seen on the network. +6. Disconnect. ### Usage diff --git a/pushtx-cli/demo.gif b/pushtx-cli/demo.gif index 5412330..ec72549 100644 Binary files a/pushtx-cli/demo.gif and b/pushtx-cli/demo.gif differ diff --git a/pushtx-cli/src/main.rs b/pushtx-cli/src/main.rs index 7010a19..710fdd6 100644 --- a/pushtx-cli/src/main.rs +++ b/pushtx-cli/src/main.rs @@ -1,6 +1,7 @@ use pushtx::*; use core::panic; +use std::collections::HashSet; use std::io::Read; use std::path::PathBuf; @@ -17,20 +18,22 @@ use clap::Parser; /// /// More verbose (debug) output can be enabled by specifying the /// -v or --verbose switch up to three times. +/// +/// Copyright (c) 2024 Alfred Hodler #[derive(Parser)] #[command(version, about, long_about, verbatim_doc_comment, name = "pushtx")] struct Cli { - /// Tor mode. Default is `try`. - #[arg(short = 'm', long)] - tor_mode: Option, + /// Tor mode. + #[arg(short = 'm', long, default_value_t = TorMode::Try)] + tor_mode: TorMode, /// Dry-run mode. Performs the whole process except the sending part. #[arg(short, long)] dry_run: bool, - /// Connect to testnet instead of mainnet. - #[arg(short, long)] - testnet: bool, + /// The network to use. + #[arg(short, long, default_value_t = Network::Mainnet)] + network: Network, /// Zero or one paths to a file containing line-delimited hex encoded transactions /// @@ -104,16 +107,13 @@ fn main() -> anyhow::Result<()> { Err(err) => Err(err), }?; + let txids: HashSet<_> = txs.iter().map(|tx| tx.txid()).collect(); + let receiver = broadcast( txs, Opts { - use_tor: cli.tor_mode.unwrap_or_default().into(), - network: if cli.testnet { - Network::Testnet - } else { - Network::Mainnet - }, - send_unsolicited: true, + use_tor: cli.tor_mode.into(), + network: cli.network.into(), dry_run: cli.dry_run, ..Default::default() }, @@ -124,23 +124,31 @@ fn main() -> anyhow::Result<()> { Ok(Info::ResolvingPeers) => println!("* Resolving peers from DNS..."), Ok(Info::ResolvedPeers(n)) => println!("* Resolved {n} peers"), Ok(Info::ConnectingToNetwork { tor_status }) => { - let network = if cli.testnet { "testnet" } else { "mainnet" }; - println!("* Connecting to the P2P network ({network})..."); + println!("* Connecting to the P2P network ({})...", cli.network); match tor_status { Some(proxy) => println!(" - using Tor proxy found at {proxy}"), None => println!(" - not using Tor"), } } - Ok(Info::Broadcast { peer }) => println!("* Successful broadcast to peer {}", peer), - Ok(Info::Done(Ok(Report { - broadcasts, - rejects, - }))) => { - println!("* Done! Broadcast to {broadcasts} peers with {rejects} rejections"); - break Ok(()); + Ok(Info::Broadcast { peer }) => println!("* Broadcast to peer {}", peer), + Ok(Info::Done(Ok(Report { success, rejects }))) => { + let difference: Vec<_> = txids.difference(&success).collect(); + if difference.is_empty() { + println!("* Done! Broadcast successful"); + break Ok(()); + } else { + println!("* Failed to broadcast one or more transactions"); + for missing in difference { + println!(" - failed: {missing}"); + } + for (r_txid, r_reason) in rejects { + println!(" - reject: {r_txid}: {r_reason}"); + } + break Err(Error::Partial.into()); + } } Ok(Info::Done(Err(error))) => { - break Err(Error::FailedToBroadcast(error).into()); + break Err(Error::Broadcast(error).into()); } Err(_) => panic!("worker thread disconnected"), } @@ -156,14 +164,15 @@ enum Error { #[error("Empty transaction set, did you pass at least one transaction?")] EmptyTxSet, #[error("Failed to broadcast: {0}")] - FailedToBroadcast(pushtx::Error), + Broadcast(pushtx::Error), + #[error("Failed to broadcast one or more transactions")] + Partial, } /// Determines how to use Tor. -#[derive(Debug, Default, Clone, clap::ValueEnum)] +#[derive(Debug, Clone, clap::ValueEnum)] pub enum TorMode { /// Use Tor if available. If not available, connect through clearnet. - #[default] Try, /// Do not use Tor even if available and running. No, @@ -180,3 +189,43 @@ impl From for pushtx::TorMode { } } } + +impl std::fmt::Display for TorMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = match self { + TorMode::Try => "try", + TorMode::No => "no", + TorMode::Must => "must", + }; + write!(f, "{}", name) + } +} + +/// The Bitcoin network to connect to. +#[derive(Debug, Clone, Copy, clap::ValueEnum)] +pub enum Network { + Mainnet, + Testnet, + Signet, +} + +impl From for pushtx::Network { + fn from(value: Network) -> Self { + match value { + Network::Mainnet => Self::Mainnet, + Network::Testnet => Self::Testnet, + Network::Signet => Self::Signet, + } + } +} + +impl std::fmt::Display for Network { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = match self { + Network::Mainnet => "mainnet", + Network::Testnet => "testnet", + Network::Signet => "signet", + }; + write!(f, "{}", name) + } +} diff --git a/pushtx/Cargo.toml b/pushtx/Cargo.toml index c4086ad..e1b0140 100644 --- a/pushtx/Cargo.toml +++ b/pushtx/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pushtx" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["Alfred Hodler "] license = "MIT" @@ -18,5 +18,4 @@ fastrand = "2.0.2" hex = "0.4.3" log = "0.4.20" peerlink = { version = "0.8.0", features = ["socks"] } -port_check = "0.2.1" sha3 = "0.10.8" diff --git a/pushtx/README.md b/pushtx/README.md index 324e3d4..4375f7d 100644 --- a/pushtx/README.md +++ b/pushtx/README.md @@ -15,8 +15,9 @@ also works. 1. Resolve peers through DNS seeds. 2. Detect if Tor is present. 3. Connect to 10 random peers, through Tor if possible. -4. Broadcast the transaction. -5. Disconnect. +4. Broadcast the transaction to a single peer. +5. Wait until the transaction is seen on the network. +6. Disconnect. ### Usage @@ -31,7 +32,7 @@ also works. loop { match receiver.recv().unwrap() { pushtx::Info::Done(Ok(report)) => { - println!("we successfully broadcast to {} peers", report.broadcasts); + println!("{} transactions broadcast successfully", report.success.len()); break; } pushtx::Info::Done(Err(err)) => { diff --git a/pushtx/src/broadcast.rs b/pushtx/src/broadcast.rs index 000881e..eef6a8e 100644 --- a/pushtx/src/broadcast.rs +++ b/pushtx/src/broadcast.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{Ipv4Addr, SocketAddr}; use std::time; use std::time::Duration; @@ -68,11 +68,13 @@ impl Runner { outbox.connect(*addr); } outbox.send().unwrap(); + let tx_map: HashMap<_, _> = self.tx.into_iter().map(|tx| (tx.0.txid(), tx.0)).collect(); + let mut acks = HashSet::new(); + let mut selected: Option> = None; let start = time::Instant::now(); - let mut broadcasts = 0; - let mut rejects = 0; + let mut rejects = HashMap::new(); loop { let mut need_replacements = 0; @@ -81,7 +83,7 @@ impl Runner { match p2p.recv_timeout(Duration::from_secs(1)).map(Into::into) { Ok(p2p::Event::ConnectedTo { target, result }) => match result { Ok(id) => { - log::info!("connected to peer @ {target}"); + log::info!("connected: peer @ {target}"); state.insert(id, Peer::Handshaking(target, Handshake::default())); outbox.version(id); } @@ -96,60 +98,44 @@ impl Runner { handshake::Event::Wait => {} handshake::Event::SendVerack => outbox.verack(peer), handshake::Event::Violation => { - log::warn!("peer {} violated handshake", s); + log::warn!("handshake violated: peer @ {}", s); state.remove(&peer); need_replacements += 1; } handshake::Event::Done { .. } => { - log::info!("handshake with {} done", s); let service = *s; - let used; - if self.opts.send_unsolicited { - used = true; - for tx in tx_map.values() { - log::info!("sending tx to {}", service); - if !self.opts.dry_run { - outbox.tx(peer, tx.to_owned()); - } - broadcasts += 1; - let _ = self.info_tx.send(Info::Broadcast { - peer: service.to_string(), - }); - } - } else { - used = false; - outbox.tx_inv(peer, tx_map.keys().cloned()); - } - state.insert(peer, Peer::Ready { service, used }); + log::info!("handshake complete: peer @ {}", s); + state.insert(peer, Peer::Ready { service }); } }, - Some(Peer::Ready { service, used }) => match message.payload() { - NetworkMessage::GetData(inv) => { + Some(Peer::Ready { service }) => match message.payload() { + NetworkMessage::Inv(inv) => { for inv in inv { if let Inventory::Transaction(wanted_txid) = inv { - if let Some(tx) = tx_map.get(wanted_txid) { - if !self.opts.dry_run { - outbox.tx(peer, tx.to_owned()); - } - *used = true; - broadcasts += 1; - let _ = self.info_tx.send(Info::Broadcast { - peer: service.to_string(), - }); + if tx_map.contains_key(wanted_txid) + && selected.as_ref().map(|s| s.id) != Some(peer) + { + log::info!( + "txid seen: peer @ {}: {}", + service, + wanted_txid + ); + acks.insert(*wanted_txid); } } } } NetworkMessage::Reject(reject) => { log::warn!( - "got a reject from {}: type={}, code={:?}, reason={}", + "reject: peer @ {}: type={}, code={:?}, reason={}", service, reject.message, reject.ccode, reject.reason ); if reject.message == "tx" { - rejects += 1; + let txid = crate::Txid(reject.hash.into()); + rejects.insert(txid, reject.reason.to_string()); } } _ => {} @@ -157,21 +143,15 @@ impl Runner { None => panic!("phantom peer {}", peer), }, - Ok(p2p::Event::Disconnected { peer, .. }) => match state.get_mut(&peer) { - Some( - Peer::Ready { - service, - used: false, + Ok(p2p::Event::Disconnected { peer, reason }) => match state.get_mut(&peer) { + Some(Peer::Ready { service } | Peer::Handshaking(service, _)) => { + log::info!("disconnected: peer @ {}, reason: {:?}", service, reason); + if selected.as_ref().map(|s| s.id) == Some(peer) { + selected = None; } - | Peer::Handshaking(service, _), - ) => { - log::info!("peer @ {service} left without letting us broadcast"); need_replacements += 1; state.remove(&peer); } - Some(_) => { - state.remove(&peer); - } None => panic!("phantom peer {}", peer), }, @@ -180,16 +160,46 @@ impl Runner { _ => {} } - // The strategy is as follows: we exponentially care less about each subsequent - // broadcast, so we add 2^broadcasts to the elapsed time. - let now = time::Instant::now(); - let elapsed = (now - start) + Duration::from_secs(1 << broadcasts); - if elapsed >= self.opts.max_time { - log::info!( - "spent {} secs with {} broadcasts, exit", - (now - start).as_secs(), - broadcasts - ); + match &selected { + Some(selected) if selected.is_stale() => { + log::warn!("rotating broadcast peer"); + outbox.disconnect(selected.id); + } + _ => {} + } + + if selected.is_none() { + let new_selected = state + .iter() + .filter_map(|(id, p)| match p { + Peer::Handshaking(_, _) => None, + Peer::Ready { service } => Some((*service, *id)), + }) + .next(); + + if let Some((service, id)) = new_selected { + log::info!("selected broadcast peer @ {service}"); + selected = Some(BroadcastPeer::new(id)); + for tx in tx_map.values() { + log::info!("broadcasting to {}", service); + if !self.opts.dry_run { + outbox.tx(id, tx.to_owned()); + } + } + let _ = self.info_tx.send(Info::Broadcast { + peer: service.to_string(), + }); + } + } + + let elapsed = time::Instant::now() - start; + + if self.opts.dry_run && elapsed.as_secs() > 3 { + acks.extend(tx_map.keys()); + } + + if acks.len() == tx_map.len() || elapsed >= self.opts.max_time { + log::info!("broadcast stop"); break; } @@ -201,16 +211,12 @@ impl Runner { client.send().unwrap(); } - std::thread::sleep(std::time::Duration::from_millis(500)); client.shutdown().join().unwrap().unwrap(); - let done = match broadcasts.try_into() { - Ok(broadcasts) => Ok(Report { - broadcasts, - rejects, - }), - Err(_) => Err(Error::Timeout), - }; - let _ = self.info_tx.send(Info::Done(done)); + let report = Ok(Report { + success: acks.into_iter().map(crate::Txid).collect(), + rejects, + }); + let _ = self.info_tx.send(Info::Done(report)); }); } } @@ -220,18 +226,43 @@ enum Peer { /// Currently handshaking. Handshaking(net::Service, Handshake), /// Handshake established, ready for interaction. - Ready { service: net::Service, used: bool }, + Ready { service: net::Service }, +} + +/// A single peer that we have selected for our transaction broadcast. +struct BroadcastPeer { + /// The id of the peer. + id: P, + /// The time the broadcast took place. + when: std::time::Instant, +} + +impl BroadcastPeer

{ + fn new(id: P) -> Self { + Self { + id, + when: std::time::Instant::now(), + } + } + /// Whether the peer is stale and should be rotated. + fn is_stale(&self) -> bool { + std::time::Instant::now() - self.when > Duration::from_secs(10) + } } /// Tries to detect a local Tor proxy on the usual ports. fn detect_tor_proxy() -> Option { + fn is_port_reachable(addr: SocketAddr) -> bool { + std::net::TcpStream::connect(addr).is_ok() + } + // Tor daemon has a SOCKS proxy on port 9050 - if port_check::is_port_reachable((Ipv4Addr::LOCALHOST, 9050)) { + if is_port_reachable((Ipv4Addr::LOCALHOST, 9050).into()) { return Some((Ipv4Addr::LOCALHOST, 9050).into()); } // Tor browser has a SOCKS proxy on port 9150 - if port_check::is_port_reachable((Ipv4Addr::LOCALHOST, 9150)) { + if is_port_reachable((Ipv4Addr::LOCALHOST, 9150).into()) { return Some((Ipv4Addr::LOCALHOST, 9150).into()); } diff --git a/pushtx/src/lib.rs b/pushtx/src/lib.rs index e159099..1f5ed4e 100644 --- a/pushtx/src/lib.rs +++ b/pushtx/src/lib.rs @@ -25,7 +25,7 @@ //! loop { //! match receiver.recv().unwrap() { //! pushtx::Info::Done(Ok(report)) => { -//! println!("we successfully broadcast to {} peers", report.broadcasts); +//! println!("{} transactions broadcast successfully", report.success.len()); //! break; //! } //! pushtx::Info::Done(Err(err)) => { @@ -43,12 +43,16 @@ mod net; mod p2p; mod seeds; -use std::{net::SocketAddr, num::NonZeroUsize, str::FromStr}; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + str::FromStr, +}; use bitcoin::consensus::Decodable; /// A Bitcoin transaction to be broadcast into the network. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Transaction(bitcoin::Transaction); impl Transaction { @@ -63,8 +67,8 @@ impl Transaction { } /// Returns the txid of this transaction. - pub fn txid(&self) -> impl std::fmt::Display { - self.0.txid() + pub fn txid(&self) -> Txid { + Txid(self.0.txid()) } } @@ -87,6 +91,15 @@ impl TryFrom<&[u8]> for Transaction { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Txid(bitcoin::Txid); + +impl std::fmt::Display for Txid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// Why an input could not be interpereted as a valid transaction. #[derive(Debug)] pub enum ParseTxError { @@ -139,8 +152,8 @@ pub enum Network { #[default] Mainnet, Testnet, - Regtest, Signet, + Regtest, } impl From for bitcoin::Network { @@ -165,15 +178,9 @@ pub struct Opts { pub find_peer_strategy: FindPeerStrategy, /// The maximum allowed duration for broadcasting regardless of the result. Terminates afterward. pub max_time: std::time::Duration, - /// Normally, no transaction should be sent to a peer without first sending an `Inv` message - /// advertising the transaction and then waiting for the peer to respond with a `GetData` - /// message indicating that it does not indeed have the transaction. However, if we are certain - /// that our transactions have not been seen by the network, we can short-circuit this process - /// and simply send them out without the `Inv`-`GetData` exchange. - pub send_unsolicited: bool, /// Whether to simulate the broadcast. This means that every part of the process will be /// executed as normal, including connecting to actual peers, but the final part where the tx - /// is sent out is omitted (we pretend that the transaction really did go out.) + /// is sent out is omitted (we pretend that the transaction really did go out and was seen.) pub dry_run: bool, /// How many peers to connect to. pub target_peers: u8, @@ -189,7 +196,6 @@ impl Default for Opts { use_tor: Default::default(), find_peer_strategy: Default::default(), max_time: std::time::Duration::from_secs(40), - send_unsolicited: false, dry_run: false, target_peers: 10, ua: None, @@ -212,27 +218,25 @@ pub enum Info { Done(Result), } -/// An informational report on a successful broadcast process. +/// An informational report on a broadcast outcome. #[derive(Debug, Clone)] pub struct Report { - /// How many peers we managed to broadcast to. - pub broadcasts: NonZeroUsize, - /// How many rejects we got back. - pub rejects: usize, + /// The list of transactions that were sent out and then seen on the network. + pub success: HashSet, + /// The list of transactions that were rejected, along with the reason. + pub rejects: HashMap, } /// Possible error variants while broadcasting. #[derive(Debug, Clone)] pub enum Error { TorNotFound, - Timeout, } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Error::TorNotFound => write!(f, "Tor was required but a Tor proxy was not found"), - Error::Timeout => write!(f, "Time out"), } } } diff --git a/pushtx/src/p2p.rs b/pushtx/src/p2p.rs index decb3eb..945873b 100644 --- a/pushtx/src/p2p.rs +++ b/pushtx/src/p2p.rs @@ -30,9 +30,6 @@ pub trait Outbox { /// Queues a `VerAck` message for sending. fn verack(&self, peer: P); - /// Queues a `Inv` message with a set of txids for sending. - fn tx_inv(&self, peer: P, txids: impl Iterator); - /// Queues a `Tx` message for sending. fn tx(&self, peer: P, tx: bitcoin::Transaction); } diff --git a/pushtx/src/p2p/client.rs b/pushtx/src/p2p/client.rs index 984f854..353f276 100644 --- a/pushtx/src/p2p/client.rs +++ b/pushtx/src/p2p/client.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::thread::JoinHandle; use bitcoin::p2p::message::{NetworkMessage, RawNetworkMessage}; -use bitcoin::p2p::message_blockdata::Inventory; use bitcoin::p2p::message_network::VersionMessage; use bitcoin::Network; use peerlink::PeerId; @@ -72,7 +71,7 @@ pub fn client( nonce: fastrand::u64(..), user_agent, start_height: start_height as i32, - relay: false, + relay: true, }, } } @@ -104,13 +103,6 @@ impl super::Outbox for Client { self.queue(self.message(peer, NetworkMessage::Verack)); } - fn tx_inv(&self, peer: PeerId, txids: impl Iterator) { - self.queue(self.message( - peer, - NetworkMessage::Inv(txids.map(Inventory::Transaction).collect()), - )) - } - fn tx(&self, peer: PeerId, tx: bitcoin::Transaction) { self.queue(self.message(peer, NetworkMessage::Tx(tx))) }