From 46bc4ee4d8ebe02018357485a621617484109041 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 22 Nov 2024 09:37:47 -0500 Subject: [PATCH 1/3] Poll mempool entries interatively Previously, we'd utilized `getrawmempool`'s `verbose` flag to retrieve additional information about mempool entries. However, when the mempool is full this could lead to errors as `lightning-block-sync`'s HTTP client limits responses to a maximum of ~8MB. Here, we switch to only query the non-verbose entries (i.e., txid only), and retrieve the needed information iteratively via follow-up calls to `getmempoolentry`, which shouldn't suffer from the same issue. --- src/chain/bitcoind_rpc.rs | 117 ++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 37 deletions(-) diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index 6e7360601..c0b4cf99e 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -113,14 +113,33 @@ impl BitcoindRpcClient { } } - pub(crate) async fn get_raw_mempool(&self) -> std::io::Result> { - let verbose_flag_json = serde_json::json!(true); + pub(crate) async fn get_raw_mempool(&self) -> std::io::Result> { + let verbose_flag_json = serde_json::json!(false); self.rpc_client .call_method::("getrawmempool", &vec![verbose_flag_json]) .await .map(|resp| resp.0) } + pub(crate) async fn get_mempool_entry(&self, txid: &Txid) -> std::io::Result { + let txid_hex = bitcoin::consensus::encode::serialize_hex(txid); + let txid_json = serde_json::json!(txid_hex); + self.rpc_client + .call_method::("getmempoolentry", &vec![txid_json]) + .await + .map(|resp| MempoolEntry { txid: txid.clone(), height: resp.height, time: resp.time }) + } + + pub(crate) async fn get_mempool_entries(&self) -> std::io::Result> { + let mempool_txids = self.get_raw_mempool().await?; + let mut mempool_entries = Vec::with_capacity(mempool_txids.len()); + for txid in &mempool_txids { + let entry = self.get_mempool_entry(txid).await?; + mempool_entries.push(entry); + } + Ok(mempool_entries) + } + /// Get mempool transactions, alongside their first-seen unix timestamps. /// /// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each @@ -132,7 +151,7 @@ impl BitcoindRpcClient { let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed); let mut latest_time = prev_mempool_time; - let mempool_entries = self.get_raw_mempool().await?; + let mempool_entries = self.get_mempool_entries().await?; let mut txs_to_emit = Vec::new(); for entry in mempool_entries { @@ -254,58 +273,82 @@ impl TryInto for JsonResponse { } } -pub struct GetRawMempoolResponse(Vec); +pub struct GetRawMempoolResponse(Vec); impl TryInto for JsonResponse { type Error = std::io::Error; fn try_into(self) -> std::io::Result { - let mut mempool_transactions = Vec::new(); - let res = self.0.as_object().ok_or(std::io::Error::new( + let res = self.0.as_array().ok_or(std::io::Error::new( std::io::ErrorKind::Other, "Failed to parse getrawmempool response", ))?; - for (k, v) in res { - let txid = match bitcoin::consensus::encode::deserialize_hex(k) { - Ok(txid) => txid, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse getrawmempool response", - )); - }, - }; - - let time = match v["time"].as_u64() { - Some(time) => time, - None => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse getrawmempool response", - )); - }, - }; + let mut mempool_transactions = Vec::with_capacity(res.len()); - let height = match v["height"].as_u64().and_then(|h| h.try_into().ok()) { - Some(height) => height, - None => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse getrawmempool response", - )); - }, + for hex in res { + let txid = if let Some(hex_str) = hex.as_str() { + match bitcoin::consensus::encode::deserialize_hex(hex_str) { + Ok(txid) => txid, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + )); + }, + } + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + )); }; - let entry = RawMempoolEntry { txid, time, height }; - mempool_transactions.push(entry); + mempool_transactions.push(txid); } Ok(GetRawMempoolResponse(mempool_transactions)) } } +pub struct GetMempoolEntryResponse { + time: u64, + height: u32, +} + +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> std::io::Result { + let res = self.0.as_object().ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getmempoolentry response", + ))?; + + let time = match res["time"].as_u64() { + Some(time) => time, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getmempoolentry response", + )); + }, + }; + + let height = match res["height"].as_u64().and_then(|h| h.try_into().ok()) { + Some(height) => height, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getmempoolentry response", + )); + }, + }; + + Ok(GetMempoolEntryResponse { time, height }) + } +} + #[derive(Debug, Clone)] -pub(crate) struct RawMempoolEntry { +pub(crate) struct MempoolEntry { /// The transaction id txid: Txid, /// Local time transaction entered pool in seconds since 1 Jan 1970 GMT From 727f685ae84a19b08320b91398d58fc0ebfb7e71 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 22 Nov 2024 09:41:22 -0500 Subject: [PATCH 2/3] Relax chain polling interval to 2 seconds Polling every second may be overly aggressive, especially when we're polling the mempool. Here, we relax the chain polling interval a bit. --- src/chain/bitcoind_rpc.rs | 8 ++++---- src/chain/mod.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index c0b4cf99e..871dd8a42 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -121,19 +121,19 @@ impl BitcoindRpcClient { .map(|resp| resp.0) } - pub(crate) async fn get_mempool_entry(&self, txid: &Txid) -> std::io::Result { - let txid_hex = bitcoin::consensus::encode::serialize_hex(txid); + pub(crate) async fn get_mempool_entry(&self, txid: Txid) -> std::io::Result { + let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid); let txid_json = serde_json::json!(txid_hex); self.rpc_client .call_method::("getmempoolentry", &vec![txid_json]) .await - .map(|resp| MempoolEntry { txid: txid.clone(), height: resp.height, time: resp.time }) + .map(|resp| MempoolEntry { txid, height: resp.height, time: resp.time }) } pub(crate) async fn get_mempool_entries(&self) -> std::io::Result> { let mempool_txids = self.get_raw_mempool().await?; let mut mempool_entries = Vec::with_capacity(mempool_txids.len()); - for txid in &mempool_txids { + for txid in mempool_txids { let entry = self.get_mempool_entry(txid).await?; mempool_entries.push(entry); } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 3c5ffc27b..4343d8ad5 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -51,7 +51,7 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap // The default Esplora client timeout we're using. pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; -const CHAIN_POLLING_INTERVAL_SECS: u64 = 1; +const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; pub(crate) enum WalletSyncStatus { Completed, From 002c4029c125a0e22b025dcce721849dadef087c Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 22 Nov 2024 16:31:06 -0500 Subject: [PATCH 3/3] Drop unnecessary `vec!` allocations in RPC calls --- src/chain/bitcoind_rpc.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index 871dd8a42..336c296ec 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -51,7 +51,7 @@ impl BitcoindRpcClient { pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result { let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); let tx_json = serde_json::json!(tx_serialized); - self.rpc_client.call_method::("sendrawtransaction", &vec![tx_json]).await + self.rpc_client.call_method::("sendrawtransaction", &[tx_json]).await } pub(crate) async fn get_fee_estimate_for_target( @@ -62,7 +62,7 @@ impl BitcoindRpcClient { self.rpc_client .call_method::( "estimatesmartfee", - &vec![num_blocks_json, estimation_mode_json], + &[num_blocks_json, estimation_mode_json], ) .await .map(|resp| resp.0) @@ -70,7 +70,7 @@ impl BitcoindRpcClient { pub(crate) async fn get_mempool_minimum_fee_rate(&self) -> std::io::Result { self.rpc_client - .call_method::("getmempoolinfo", &vec![]) + .call_method::("getmempoolinfo", &[]) .await .map(|resp| resp.0) } @@ -82,7 +82,7 @@ impl BitcoindRpcClient { let txid_json = serde_json::json!(txid_hex); match self .rpc_client - .call_method::("getrawtransaction", &vec![txid_json]) + .call_method::("getrawtransaction", &[txid_json]) .await { Ok(resp) => Ok(Some(resp.0)), @@ -116,7 +116,7 @@ impl BitcoindRpcClient { pub(crate) async fn get_raw_mempool(&self) -> std::io::Result> { let verbose_flag_json = serde_json::json!(false); self.rpc_client - .call_method::("getrawmempool", &vec![verbose_flag_json]) + .call_method::("getrawmempool", &[verbose_flag_json]) .await .map(|resp| resp.0) } @@ -125,7 +125,7 @@ impl BitcoindRpcClient { let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid); let txid_json = serde_json::json!(txid_hex); self.rpc_client - .call_method::("getmempoolentry", &vec![txid_json]) + .call_method::("getmempoolentry", &[txid_json]) .await .map(|resp| MempoolEntry { txid, height: resp.height, time: resp.time }) }