diff --git a/src/builder.rs b/src/builder.rs index d4987d805..ceb3c0918 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1126,6 +1126,12 @@ fn build_with_store_internal( liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); + gossip_source.set_gossip_verifier( + Arc::clone(&chain_source), + Arc::clone(&peer_manager), + Arc::clone(&runtime), + ); + let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index 3a3073378..400d3963c 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -45,6 +45,10 @@ impl BitcoindRpcClient { Self { rpc_client, latest_mempool_timestamp } } + pub(crate) fn rpc_client(&self) -> Arc { + Arc::clone(&self.rpc_client) + } + pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result { let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); let tx_json = serde_json::json!(tx_serialized); diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 58d8a6417..acd2cb2cb 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -29,6 +29,7 @@ use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarge use lightning::chain::{Confirm, Filter, Listen}; use lightning::util::ser::Writeable; +use lightning_block_sync::gossip::UtxoSource; use lightning_transaction_sync::EsploraSyncClient; use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header}; @@ -192,6 +193,13 @@ impl ChainSource { } } + pub(crate) fn as_utxo_source(&self) -> Option> { + match self { + Self::BitcoindRpc { bitcoind_rpc_client, .. } => Some(bitcoind_rpc_client.rpc_client()), + _ => None, + } + } + pub(crate) async fn continuously_sync_wallets( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, channel_manager: Arc, chain_monitor: Arc, diff --git a/src/gossip.rs b/src/gossip.rs index 450b5b5ee..45ceb536f 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -5,20 +5,23 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; -use crate::logger::{log_trace, FilesystemLogger, Logger}; -use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; +use crate::logger::{log_error, log_trace, FilesystemLogger, Logger}; +use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup}; use crate::Error; -use lightning::routing::utxo::UtxoLookup; +use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier}; +use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; pub(crate) enum GossipSource { P2PNetwork { gossip_sync: Arc, + logger: Arc, }, RapidGossipSync { gossip_sync: Arc, @@ -32,10 +35,10 @@ impl GossipSource { pub fn new_p2p(network_graph: Arc, logger: Arc) -> Self { let gossip_sync = Arc::new(P2PGossipSync::new( network_graph, - None::>, - logger, + None::>, + Arc::clone(&logger), )); - Self::P2PNetwork { gossip_sync } + Self::P2PNetwork { gossip_sync, logger } } pub fn new_rgs( @@ -58,9 +61,30 @@ impl GossipSource { } } + pub(crate) fn set_gossip_verifier( + &self, chain_source: Arc, peer_manager: Arc, + runtime: Arc>>>, + ) { + match self { + Self::P2PNetwork { gossip_sync, logger } => { + if let Some(utxo_source) = chain_source.as_utxo_source() { + let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger)); + let gossip_verifier = Arc::new(GossipVerifier::new( + utxo_source, + spawner, + Arc::clone(gossip_sync), + peer_manager, + )); + gossip_sync.add_utxo_lookup(Some(gossip_verifier)); + } + }, + _ => (), + } + } + pub async fn update_rgs_snapshot(&self) -> Result { match self { - Self::P2PNetwork { gossip_sync: _ } => Ok(0), + Self::P2PNetwork { gossip_sync: _, .. } => Ok(0), Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); let query_url = format!("{}/{}", server_url, query_timestamp); @@ -101,3 +125,30 @@ impl GossipSource { } } } + +pub(crate) struct RuntimeSpawner { + runtime: Arc>>>, + logger: Arc, +} + +impl RuntimeSpawner { + pub(crate) fn new( + runtime: Arc>>>, logger: Arc, + ) -> Self { + Self { runtime, logger } + } +} + +impl FutureSpawner for RuntimeSpawner { + fn spawn + Send + 'static>(&self, future: T) { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { + log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); + debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); + return; + } + + let runtime = rt_lock.as_ref().unwrap(); + runtime.spawn(future); + } +} diff --git a/src/types.rs b/src/types.rs index fc1ea8d31..4d0e892fe 100644 --- a/src/types.rs +++ b/src/types.rs @@ -8,6 +8,7 @@ use crate::chain::ChainSource; use crate::config::ChannelConfig; use crate::fee_estimator::OnchainFeeEstimator; +use crate::gossip::RuntimeSpawner; use crate::logger::FilesystemLogger; use crate::message_handler::NodeCustomMessageHandler; @@ -25,6 +26,9 @@ use lightning::sign::InMemorySigner; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; + +use lightning_block_sync::gossip::{GossipVerifier, UtxoSource}; + use lightning_net_tokio::SocketDescriptor; use bitcoin::secp256k1::PublicKey; @@ -91,7 +95,8 @@ pub(crate) type Scorer = ProbabilisticScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; -pub(crate) type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; +pub(crate) type UtxoLookup = + GossipVerifier, Arc>; pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>;