Skip to content

Commit

Permalink
Merge pull request #410 from tnull/2024-11-iterative-mempool-retrieval
Browse files Browse the repository at this point in the history
Poll mempool entries interatively
  • Loading branch information
tnull authored Nov 23, 2024
2 parents 34fd852 + 002c402 commit 2753ac4
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 43 deletions.
127 changes: 85 additions & 42 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl BitcoindRpcClient {
pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
let tx_json = serde_json::json!(tx_serialized);
self.rpc_client.call_method::<Txid>("sendrawtransaction", &vec![tx_json]).await
self.rpc_client.call_method::<Txid>("sendrawtransaction", &[tx_json]).await
}

pub(crate) async fn get_fee_estimate_for_target(
Expand All @@ -62,15 +62,15 @@ impl BitcoindRpcClient {
self.rpc_client
.call_method::<FeeResponse>(
"estimatesmartfee",
&vec![num_blocks_json, estimation_mode_json],
&[num_blocks_json, estimation_mode_json],
)
.await
.map(|resp| resp.0)
}

pub(crate) async fn get_mempool_minimum_fee_rate(&self) -> std::io::Result<FeeRate> {
self.rpc_client
.call_method::<MempoolMinFeeResponse>("getmempoolinfo", &vec![])
.call_method::<MempoolMinFeeResponse>("getmempoolinfo", &[])
.await
.map(|resp| resp.0)
}
Expand All @@ -82,7 +82,7 @@ impl BitcoindRpcClient {
let txid_json = serde_json::json!(txid_hex);
match self
.rpc_client
.call_method::<GetRawTransactionResponse>("getrawtransaction", &vec![txid_json])
.call_method::<GetRawTransactionResponse>("getrawtransaction", &[txid_json])
.await
{
Ok(resp) => Ok(Some(resp.0)),
Expand Down Expand Up @@ -113,14 +113,33 @@ impl BitcoindRpcClient {
}
}

pub(crate) async fn get_raw_mempool(&self) -> std::io::Result<Vec<RawMempoolEntry>> {
let verbose_flag_json = serde_json::json!(true);
pub(crate) async fn get_raw_mempool(&self) -> std::io::Result<Vec<Txid>> {
let verbose_flag_json = serde_json::json!(false);
self.rpc_client
.call_method::<GetRawMempoolResponse>("getrawmempool", &vec![verbose_flag_json])
.call_method::<GetRawMempoolResponse>("getrawmempool", &[verbose_flag_json])
.await
.map(|resp| resp.0)
}

pub(crate) async fn get_mempool_entry(&self, txid: Txid) -> std::io::Result<MempoolEntry> {
let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid);
let txid_json = serde_json::json!(txid_hex);
self.rpc_client
.call_method::<GetMempoolEntryResponse>("getmempoolentry", &[txid_json])
.await
.map(|resp| MempoolEntry { txid, height: resp.height, time: resp.time })
}

pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
let mempool_txids = self.get_raw_mempool().await?;
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());
for txid in mempool_txids {
let entry = self.get_mempool_entry(txid).await?;
mempool_entries.push(entry);
}
Ok(mempool_entries)
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
///
/// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
Expand All @@ -132,7 +151,7 @@ impl BitcoindRpcClient {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mut latest_time = prev_mempool_time;

let mempool_entries = self.get_raw_mempool().await?;
let mempool_entries = self.get_mempool_entries().await?;
let mut txs_to_emit = Vec::new();

for entry in mempool_entries {
Expand Down Expand Up @@ -254,58 +273,82 @@ impl TryInto<GetRawTransactionResponse> for JsonResponse {
}
}

pub struct GetRawMempoolResponse(Vec<RawMempoolEntry>);
pub struct GetRawMempoolResponse(Vec<Txid>);

impl TryInto<GetRawMempoolResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<GetRawMempoolResponse> {
let mut mempool_transactions = Vec::new();
let res = self.0.as_object().ok_or(std::io::Error::new(
let res = self.0.as_array().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
))?;

for (k, v) in res {
let txid = match bitcoin::consensus::encode::deserialize_hex(k) {
Ok(txid) => txid,
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
};

let time = match v["time"].as_u64() {
Some(time) => time,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
};
let mut mempool_transactions = Vec::with_capacity(res.len());

let height = match v["height"].as_u64().and_then(|h| h.try_into().ok()) {
Some(height) => height,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
for hex in res {
let txid = if let Some(hex_str) = hex.as_str() {
match bitcoin::consensus::encode::deserialize_hex(hex_str) {
Ok(txid) => txid,
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
}
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
};
let entry = RawMempoolEntry { txid, time, height };

mempool_transactions.push(entry);
mempool_transactions.push(txid);
}

Ok(GetRawMempoolResponse(mempool_transactions))
}
}

pub struct GetMempoolEntryResponse {
time: u64,
height: u32,
}

impl TryInto<GetMempoolEntryResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<GetMempoolEntryResponse> {
let res = self.0.as_object().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
))?;

let time = match res["time"].as_u64() {
Some(time) => time,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
));
},
};

let height = match res["height"].as_u64().and_then(|h| h.try_into().ok()) {
Some(height) => height,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
));
},
};

Ok(GetMempoolEntryResponse { time, height })
}
}

#[derive(Debug, Clone)]
pub(crate) struct RawMempoolEntry {
pub(crate) struct MempoolEntry {
/// The transaction id
txid: Txid,
/// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
Expand Down
2 changes: 1 addition & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap
// The default Esplora client timeout we're using.
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;

const CHAIN_POLLING_INTERVAL_SECS: u64 = 1;
const CHAIN_POLLING_INTERVAL_SECS: u64 = 2;

pub(crate) enum WalletSyncStatus {
Completed,
Expand Down

0 comments on commit 2753ac4

Please sign in to comment.