diff --git a/Cargo.toml b/Cargo.toml index ee31730..1fee6cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,16 +6,16 @@ edition = "2021" [dependencies] bitcoin = "0.30" hex-conservative = "0.2" -lightning = { version = "0.0.121" } -lightning-block-sync = { version = "0.0.121", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.121" } +lightning = { version = "0.0.123" } +lightning-block-sync = { version = "0.0.123", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.123" } tokio = { version = "1.25", features = ["full"] } tokio-postgres = { version = "=0.7.5" } futures = "0.3" [dev-dependencies] -lightning = { version = "0.0.121", features = ["_test_utils"] } -lightning-rapid-gossip-sync = { version = "0.0.121" } +lightning = { version = "0.0.123", features = ["_test_utils"] } +lightning-rapid-gossip-sync = { version = "0.0.123" } [profile.dev] panic = "abort" diff --git a/src/config.rs b/src/config.rs index 0941e78..c46ddf8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; -pub(crate) const SCHEMA_VERSION: i32 = 13; +pub(crate) const SCHEMA_VERSION: i32 = 14; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks // generate symlinks based on a 3-hour-granularity @@ -135,6 +135,18 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str { )" } +pub(crate) fn db_node_announcement_table_creation_query() -> &'static str { + "CREATE TABLE IF NOT EXISTS node_announcements ( + id SERIAL PRIMARY KEY, + public_key varchar(66) NOT NULL, + features BYTEA NOT NULL, + socket_addresses BYTEA NOT NULL, + timestamp bigint NOT NULL, + announcement_signed BYTEA, + seen timestamp NOT NULL DEFAULT NOW() + )" +} + pub(crate) fn db_index_creation_query() -> &'static str { " CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id); diff --git a/src/downloader.rs b/src/downloader.rs index af854c8..49e3019 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -14,6 +14,7 @@ use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; pub(crate) struct GossipCounter { + pub(crate) node_announcements: u64, pub(crate) channel_announcements: u64, pub(crate) channel_updates: u64, pub(crate) channel_updates_without_htlc_max_msats: u64, @@ -23,6 +24,7 @@ pub(crate) struct GossipCounter { impl GossipCounter { pub(crate) fn new() -> Self { Self { + node_announcements: 0, channel_announcements: 0, channel_updates: 0, channel_updates_without_htlc_max_msats: 0, @@ -71,6 +73,21 @@ impl GossipRouter where L::Target: Logger { } } + fn new_node_announcement(&self, msg: NodeAnnouncement) { + { + let mut counter = self.counter.write().unwrap(); + counter.node_announcements += 1; + } + + let gossip_message = GossipMessage::NodeAnnouncement(msg, None); + if let Err(err) = self.sender.try_send(gossip_message) { + let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { + self.sender.send(gossip_message).await.unwrap(); + })}); + } + } + fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; let gossip_message = GossipMessage::ChannelUpdate(msg, None); @@ -92,7 +109,9 @@ impl MessageSendEventsProvider for GossipRouter< MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => { self.new_channel_announcement(msg); }, - MessageSendEvent::BroadcastNodeAnnouncement { .. } => {}, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + self.new_node_announcement(msg); + }, MessageSendEvent::BroadcastChannelUpdate { msg } => { self.new_channel_update(msg); }, @@ -105,7 +124,9 @@ impl MessageSendEventsProvider for GossipRouter< impl RoutingMessageHandler for GossipRouter where L::Target: Logger { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { - self.native_router.handle_node_announcement(msg) + let res = self.native_router.handle_node_announcement(msg)?; + self.new_node_announcement(msg.clone()); + Ok(res) } fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { diff --git a/src/persistence.rs b/src/persistence.rs index a7cfb37..04c6b9a 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -69,21 +69,20 @@ impl GossipPersister where L::Target: Logger { panic!("db init error: {}", initialization_error); } - let initialization = client - .execute(config::db_announcement_table_creation_query(), &[]) - .await; - if let Err(initialization_error) = initialization { - panic!("db init error: {}", initialization_error); - } + let table_creation_queries = [ + config::db_announcement_table_creation_query(), + config::db_channel_update_table_creation_query(), + config::db_channel_update_table_creation_query(), + config::db_node_announcement_table_creation_query() + ]; - let initialization = client - .execute( - config::db_channel_update_table_creation_query(), - &[], - ) - .await; - if let Err(initialization_error) = initialization { - panic!("db init error: {}", initialization_error); + for current_table_creation_query in table_creation_queries { + let initialization = client + .execute(current_table_creation_query, &[]) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } } let initialization = client @@ -133,6 +132,59 @@ impl GossipPersister where L::Target: Logger { let connections_cache_ref = Arc::clone(&connections_cache); match gossip_message { + GossipMessage::NodeAnnouncement(announcement, seen_override) => { + let public_key_hex = announcement.contents.node_id.to_string(); + + let mut announcement_signed = Vec::new(); + announcement.write(&mut announcement_signed).unwrap(); + + let features = announcement.contents.features.encode(); + let timestamp = announcement.contents.timestamp as i64; + + let mut serialized_addresses = Vec::new(); + announcement.contents.addresses.write(&mut serialized_addresses).unwrap(); + + let _task = self.tokio_runtime.spawn(async move { + if cfg!(test) && seen_override.is_some() { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO node_announcements (\ + public_key, \ + features, \ + socket_addresses, \ + timestamp, \ + announcement_signed, \ + seen \ + ) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6))", &[ + &public_key_hex, + &features, + &serialized_addresses, + ×tamp, + &announcement_signed, + &(seen_override.unwrap() as f64) + ])).await.unwrap().unwrap(); + } else { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO node_announcements (\ + public_key, \ + features, \ + socket_addresses, \ + timestamp, \ + announcement_signed \ + ) VALUES ($1, $2, $3, $4, $5)", &[ + &public_key_hex, + &features, + &serialized_addresses, + ×tamp, + &announcement_signed, + ])).await.unwrap().unwrap(); + } + let mut connections_set = connections_cache_ref.lock().await; + connections_set.push(client); + limiter_ref.add_permits(1); + }); + #[cfg(test)] + tasks_spawned.push(_task); + }, GossipMessage::ChannelAnnouncement(announcement, seen_override) => { let scid = announcement.contents.short_channel_id as i64; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index e668f0b..578fa9f 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -11,9 +11,9 @@ use bitcoin::secp256k1::{Secp256k1, SecretKey}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use hex_conservative::DisplayHex; -use lightning::ln::features::ChannelFeatures; -use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; -use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::ln::features::{ChannelFeatures, NodeFeatures}; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement}; +use lightning::routing::gossip::{NetworkGraph, NodeAlias, NodeId}; use lightning::util::ser::Writeable; use lightning_rapid_gossip_sync::RapidGossipSync; use crate::{config, serialize_delta}; @@ -47,7 +47,35 @@ pub(crate) fn db_test_schema() -> String { }) } -fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement { +fn generate_node_announcement() -> NodeAnnouncement { + let secp_context = Secp256k1::new(); + + let random_private_key = SecretKey::from_slice(&[1; 32]).unwrap(); + let random_public_key = random_private_key.public_key(&secp_context); + let node_id = NodeId::from_pubkey(&random_public_key); + + let announcement = UnsignedNodeAnnouncement { + features: NodeFeatures::empty(), + timestamp: 0, + node_id, + rgb: [0, 128, 255], + alias: NodeAlias([0; 32]), + addresses: vec![], + excess_data: vec![], + excess_address_data: vec![], + }; + + let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap(); + let signature = secp_context.sign_ecdsa(&msg_hash, &random_private_key); + + NodeAnnouncement { + signature, + contents: announcement, + } +} + + +fn generate_channel_announcement(short_channel_id: u64) -> ChannelAnnouncement { let secp_context = Secp256k1::new(); let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap(); @@ -205,7 +233,7 @@ async fn test_trivial_setup() { println!("timestamp: {}", timestamp); { // seed the db - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0); let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0); @@ -265,6 +293,48 @@ async fn test_trivial_setup() { }).await.unwrap(); } +#[tokio::test] +async fn test_node_announcement_persistence() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + { // seed the db + let mut announcement = generate_node_announcement(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), None)).await.unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(12345))).await.unwrap(); + + { + // modify announcement to contain a bunch of addresses + announcement.contents.addresses.push(SocketAddress::Hostname { + hostname: "google.com".to_string().try_into().unwrap(), + port: 443, + }); + announcement.contents.addresses.push(SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 9635 }); + announcement.contents.addresses.push(SocketAddress::TcpIpV6 { addr: [1; 16], port: 1337 }); + announcement.contents.addresses.push(SocketAddress::OnionV2([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])); + announcement.contents.addresses.push(SocketAddress::OnionV3 { + ed25519_pubkey: [1; 32], + checksum: 2, + version: 3, + port: 4, + }); + } + receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(12345))).await.unwrap(); + + drop(receiver); + persister.persist_gossip().await; + + tokio::task::spawn_blocking(move || { + drop(persister); + }).await.unwrap(); + } + clean_test_db().await; +} + + /// If a channel has only seen updates in one direction, it should not be announced #[tokio::test] async fn test_unidirectional_intermediate_update_consideration() { @@ -280,7 +350,7 @@ async fn test_unidirectional_intermediate_update_consideration() { println!("timestamp: {}", timestamp); { // seed the db - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0); let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0); let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0); @@ -348,7 +418,7 @@ async fn test_bidirectional_intermediate_update_consideration() { println!("timestamp: {}", timestamp); { // seed the db - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0); let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0); let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0); @@ -407,7 +477,7 @@ async fn test_channel_reminders() { { // seed the db { // unupdated channel let short_channel_id = 1; - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0); let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0); @@ -421,7 +491,7 @@ async fn test_channel_reminders() { } { // unmodified but updated channel let short_channel_id = 2; - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 10, 0, 0, 0, 5, 0); // in the false direction, we have one update that's different prior let update_2 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 5, 0, 1, 0, 5, 0); @@ -488,7 +558,7 @@ async fn test_full_snapshot_recency() { { // seed the db let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); @@ -568,7 +638,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() { { // seed the db let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); @@ -648,7 +718,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() { { // seed the db let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); @@ -730,7 +800,7 @@ async fn test_full_snapshot_mutiny_scenario() { { // seed the db let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); @@ -867,13 +937,13 @@ async fn test_full_snapshot_interlaced_channel_timestamps() { let secondary_channel_id = main_channel_id + 1; { // main channel - let announcement = generate_announcement(main_channel_id); + let announcement = generate_channel_announcement(main_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); } { // secondary channel - let announcement = generate_announcement(secondary_channel_id); + let announcement = generate_channel_announcement(secondary_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); } @@ -975,7 +1045,7 @@ async fn test_full_snapshot_persistence() { { // seed the db let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); - let announcement = generate_announcement(short_channel_id); + let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); diff --git a/src/types.rs b/src/types.rs index 0c6c9b2..f38a376 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use lightning::sign::KeysManager; -use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}; use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager}; use lightning::util::logger::{Logger, Record}; use crate::config; @@ -14,6 +14,7 @@ pub(crate) type GossipPeerManager = Arc), // the second element is an optional override for the seen value ChannelAnnouncement(ChannelAnnouncement, Option), ChannelUpdate(ChannelUpdate, Option),