From 71b734587c3abd115ad63ecd34b70f8a3a97a6c7 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 28 Jun 2024 13:57:48 -0400 Subject: [PATCH 1/9] Parametrize prune interval. Create a config variable representing the time interval whereafter graph data gets pruned. This value should be used to limit lookup time frames. --- src/config.rs | 4 ++++ src/lookup.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 4950ec0..a302fb7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks /// updates in both directions. pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60); +/// The interval after which graph data gets pruned after it was first seen +/// This should match the LDK default pruning interval, which is 14 days +pub(crate) const PRUNE_INTERVAL: Duration = Duration::from_secs(14 * 24 * 60 * 60); + /// Maximum number of default features to calculate for node announcements pub(crate) const NODE_DEFAULT_FEATURE_COUNT: u8 = 6; diff --git a/src/lookup.rs b/src/lookup.rs index 4337508..776b1ab 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -231,7 +231,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64; log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)"); - let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64; + let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::PRUNE_INTERVAL.as_secs()).unwrap() as f64; let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp]; /* From 86104e585cfb8a8e91aa391ce82c00cb4fb11d08 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 28 Jun 2024 17:13:05 -0400 Subject: [PATCH 2/9] Rename latest_details_after_seen to latest_details. If the latest node details have already been seen by a client, we still need to store them for correctly detecting reminder necessity in the future. --- src/lib.rs | 4 ++-- src/lookup.rs | 6 +++--- src/serialization.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4da8f38..9b66e73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -320,7 +320,7 @@ fn serialize_delta(serialization_details: &SerializationSet, s if node_delta.has_address_set_changed { node_address_update_count += 1; - let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses; + let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; let mut address_serialization = Vec::new(); // we don't know a priori how many are <= 255 bytes @@ -348,7 +348,7 @@ fn serialize_delta(serialization_details: &SerializationSet, s if node_delta.has_feature_set_changed { node_feature_update_count += 1; - let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features; + let latest_features = &node_delta.latest_details.as_ref().unwrap().features; // are these features among the most common ones? if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { diff --git a/src/lookup.rs b/src/lookup.rs index 776b1ab..302268b 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -54,7 +54,7 @@ pub(super) struct ChannelDelta { pub(super) struct NodeDelta { /// The most recently received, but new-to-the-client, node details - pub(super) latest_details_after_seen: Option, + pub(super) latest_details: Option, /// Between last_details_before_seen and latest_details_after_seen, including any potential /// intermediate updates that are not kept track of here, has the set of features this node @@ -91,7 +91,7 @@ impl Default for ChannelDelta { impl Default for NodeDelta { fn default() -> Self { Self { - latest_details_after_seen: None, + latest_details: None, has_feature_set_changed: false, has_address_set_changed: false, last_details_before_seen: None, @@ -571,7 +571,7 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time } if !is_previously_processed_node_id { - (*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails { + (*current_node_delta).latest_details.get_or_insert(NodeDetails { seen: current_seen_timestamp, features: unsigned_node_announcement.features, addresses: address_set, diff --git a/src/serialization.rs b/src/serialization.rs index 5f11b27..a527b45 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -228,7 +228,7 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default(); for (_id, delta) in serialization_set.node_mutations.iter() { if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() { - if let Some(latest_details) = delta.latest_details_after_seen.as_ref() { + if let Some(latest_details) = delta.latest_details.as_ref() { *node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1; }; } From 8219e5d119011f53983e733dd29e166860a5d90e Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Mon, 24 Jun 2024 14:36:13 -0400 Subject: [PATCH 3/9] Upgrade LDK to 0.0.124. --- Cargo.toml | 14 +++++++------- src/config.rs | 4 ++-- src/lookup.rs | 17 +++++++++-------- src/persistence.rs | 6 +++--- src/serialization.rs | 2 +- src/tests/mod.rs | 3 ++- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1fee6cd..671d41a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,18 +4,18 @@ version = "0.1.0" edition = "2021" [dependencies] -bitcoin = "0.30" -hex-conservative = "0.2" -lightning = { version = "0.0.123" } -lightning-block-sync = { version = "0.0.123", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.123" } +bitcoin = "0.32.2" +hex-conservative = "0.2.1" +lightning = { version = "0.0.124" } +lightning-block-sync = { version = "0.0.124", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.124" } tokio = { version = "1.25", features = ["full"] } tokio-postgres = { version = "=0.7.5" } futures = "0.3" [dev-dependencies] -lightning = { version = "0.0.123", features = ["_test_utils"] } -lightning-rapid-gossip-sync = { version = "0.0.123" } +lightning = { version = "0.0.124", features = ["_test_utils"] } +lightning-rapid-gossip-sync = { version = "0.0.124" } [profile.dev] panic = "abort" diff --git a/src/config.rs b/src/config.rs index a302fb7..6220fc1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,10 @@ use crate::hex_utils; use std::env; -use std::io::Cursor; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; +use bitcoin::io::Cursor; use bitcoin::Network; use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::PublicKey; @@ -223,7 +223,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) let announcement: Vec = row.get("announcement_signed"); let tx_ref = &tx; updates.push(async move { - let scid = ChannelAnnouncement::read(&mut Cursor::new(announcement)).unwrap().contents.short_channel_id as i64; + let scid = ChannelAnnouncement::read(&mut Cursor::new(&announcement)).unwrap().contents.short_channel_id as i64; assert!(scid > 0); // Will roll over in some 150 years or so tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap(); }); diff --git a/src/lookup.rs b/src/lookup.rs index 302268b..9a584c4 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -1,9 +1,10 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::Cursor; use std::ops::Deref; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use bitcoin::io::Cursor; + use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use lightning::routing::gossip::{NetworkGraph, NodeId}; use lightning::util::ser::Readable; @@ -160,7 +161,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS while let Some(row_res) = pinned_rows.next().await { let current_announcement_row = row_res.unwrap(); let blob: Vec = current_announcement_row.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents; let scid = unsigned_announcement.short_channel_id; @@ -287,7 +288,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS if seen < reminder_threshold_timestamp as u32 { let blob: Vec = current_row.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -365,7 +366,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl let direction: bool = current_reference.get("direction"); let seen = current_reference.get::<_, i64>("seen") as u32; let blob: Vec = current_reference.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -415,7 +416,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl let direction: bool = intermediate_update.get("direction"); let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32; let blob: Vec = intermediate_update.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -451,7 +452,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl // determine mutations if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() { - if unsigned_channel_update.flags != last_seen_update.update.flags { + if unsigned_channel_update.channel_flags != last_seen_update.update.channel_flags { update_delta.mutated_properties.flags = true; } if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta { @@ -498,7 +499,7 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time let seen = current_reference.get::<_, i64>("seen") as u32; let blob: Vec = current_reference.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents; let node_id = unsigned_node_announcement.node_id; @@ -541,7 +542,7 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32; let blob: Vec = intermediate_update.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents; let node_id = unsigned_node_announcement.node_id; diff --git a/src/persistence.rs b/src/persistence.rs index 04c6b9a..0db3091 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -226,8 +226,8 @@ impl GossipPersister where L::Target: Logger { let timestamp = update.contents.timestamp as i64; - let direction = (update.contents.flags & 1) == 1; - let disable = (update.contents.flags & 2) > 0; + let direction = (update.contents.channel_flags & 1) == 1; + let disable = (update.contents.channel_flags & 2) > 0; let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32; let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64; @@ -281,7 +281,7 @@ impl GossipPersister where L::Target: Logger { ×tamp, #[cfg(test)] &_seen_timestamp, - &(update.contents.flags as i16), + &(update.contents.channel_flags as i16), &direction, &disable, &cltv_expiry_delta, diff --git a/src/serialization.rs b/src/serialization.rs index a527b45..74c92ca 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -93,7 +93,7 @@ impl UpdateSerialization { fn flags(&self) -> u8 { match self { UpdateSerialization::Full(latest_update)| - UpdateSerialization::Incremental(latest_update, _) => latest_update.flags, + UpdateSerialization::Incremental(latest_update, _) => latest_update.channel_flags, UpdateSerialization::Reminder(_, flags) => *flags, } } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 7290e6e..2e0e0c1 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -118,7 +118,8 @@ fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16 chain_hash: genesis_hash(), short_channel_id: scid, timestamp, - flags: 0 | flag_mask, + message_flags: 0, + channel_flags: flag_mask, cltv_expiry_delta: expiry_delta, htlc_minimum_msat: min_msat, htlc_maximum_msat: max_msat, From 63aaf6111213a74386a3cbc76ec7e2bf6b0b7f9c Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 2 Jul 2024 00:20:19 -0700 Subject: [PATCH 4/9] Only consider node announcements from current graph. We want to ignore any node announcements that have already been pruned. To do so, we extract all the node IDs from the network graph, and use those to filter our queries. --- src/lib.rs | 4 ++-- src/lookup.rs | 42 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9b66e73..21e21d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,11 +187,11 @@ async fn calculate_delta(network_graph: Arc>, // for announcement-free incremental-only updates, chain hash can be skipped let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; + lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await; + let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); diff --git a/src/lookup.rs b/src/lookup.rs index 9a584c4..209e5ec 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -11,6 +11,7 @@ use lightning::util::ser::Readable; use tokio_postgres::Client; use futures::StreamExt; +use hex_conservative::DisplayHex; use lightning::{log_debug, log_gossip, log_info}; use lightning::ln::features::NodeFeatures; use lightning::util::logger::Logger; @@ -475,19 +476,45 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) async fn fetch_node_updates(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { +pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_float = last_sync_timestamp as f64; - let mut delta_set = NodeDeltaSet::new(); + let mut delta_set: NodeDeltaSet = { + let read_only_graph = network_graph.read_only(); + read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| { + let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() { + NodeDetails { + seen: 0, + features: details.features().clone(), + addresses: details.addresses().into_iter().cloned().collect(), + } + } else { + return None; + }; + Some((node_id.clone(), NodeDelta { + latest_details: Some(details), + has_feature_set_changed: false, + has_address_set_changed: false, + last_details_before_seen: None, + })) + }).collect() + }; + + let node_ids: Vec = delta_set.keys().into_iter().map(|id| id.as_slice().to_lower_hex_string()).collect(); + #[cfg(test)] + log_info!(logger, "Node IDs: {:?}", node_ids); // get the latest node updates prior to last_sync_timestamp + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let reference_rows = client.query_raw(" SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed FROM node_announcements - WHERE seen < TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen < TO_TIMESTAMP($2) ORDER BY public_key ASC, seen DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed()); @@ -524,12 +551,15 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time // get all the intermediate node updates // (to calculate the set of mutated fields for snapshotting, where intermediate updates may // have been omitted) + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let intermediate_updates = client.query_raw(" SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM node_announcements - WHERE seen >= TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen >= TO_TIMESTAMP($2) ORDER BY public_key ASC, timestamp DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed()); From f46b5ecf8bbee0e4399c5b516cbc390252d887b4 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 2 Jul 2024 00:29:00 -0700 Subject: [PATCH 5/9] Extract snapshot reminder inclusion check method. We will need to determine whether or not a snapshot should include reminders for both channel and node update messages. To prepare for that, we extract the decision logic into its own method. --- src/lookup.rs | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 209e5ec..026bdeb 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -112,6 +112,24 @@ impl Default for DirectedUpdateDelta { } } +fn should_snapshot_include_reminders(last_sync_timestamp: u32, current_timestamp: u64, logger: &L) -> bool where L::Target: Logger { + let current_hour = current_timestamp / 3600; + let current_day = current_timestamp / (24 * 3600); + + log_debug!(logger, "Current day index: {}", current_day); + log_debug!(logger, "Current hour: {}", current_hour); + + // every 5th day at midnight + let is_reminder_hour = (current_hour % 24) == 0; + let is_reminder_day = (current_day % 5) == 0; + + let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); + let is_reminder_scope = snapshot_scope > (50 * 3600); + log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); + + (is_reminder_hour && is_reminder_day) || is_reminder_scope +} + /// Fetch all the channel announcements that are presently in the network graph, regardless of /// whether they had been seen before. /// Also include all announcements for which the first update was announced @@ -135,23 +153,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()); log_info!(logger, "Current timestamp: {}", current_timestamp); - let include_reminders = { - let current_hour = current_timestamp / 3600; - let current_day = current_timestamp / (24 * 3600); - - log_debug!(logger, "Current day index: {}", current_day); - log_debug!(logger, "Current hour: {}", current_hour); - - // every 5th day at midnight - let is_reminder_hour = (current_hour % 24) == 0; - let is_reminder_day = (current_day % 5) == 0; - - let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); - let is_reminder_scope = snapshot_scope > (50 * 3600); - log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); - - (is_reminder_hour && is_reminder_day) || is_reminder_scope - }; + let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger); log_info!(logger, "Obtaining corresponding database entries"); // get all the channel announcements that are currently in the network graph From c9aaf09d04116f631029b6e43252407103d44b70 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 2 Jul 2024 02:58:58 -0700 Subject: [PATCH 6/9] Introduce node serialization strategy. With the addition of reminders, we may encounter scenarios where either a bit flip may suffice, instructing the client to look up its latest data, or we may need to serialize all announcement details a new if the client may have already purged the old data. To better distinguish between these scenarios, we introduce a serialization strategy enum that allows serializing either the full announcement, just the mutations, or serve solely as a reminder and serialize nothing at all. --- src/lib.rs | 92 ++++++++++++++++++++--------------- src/lookup.rs | 113 +++++++++++++++++++++++++++---------------- src/serialization.rs | 26 +++++++++- src/tests/mod.rs | 50 +++++++++++-------- 4 files changed, 178 insertions(+), 103 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 21e21d6..00a5ed5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL; use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; -use crate::serialization::{SerializationSet, UpdateSerialization}; +use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization}; use crate::snapshot::Snapshotter; use crate::types::RGSSLogger; @@ -191,7 +191,7 @@ async fn calculate_delta(network_graph: Arc>, log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await; + let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); @@ -306,6 +306,9 @@ fn serialize_delta(serialization_details: &SerializationSet, s if serialization_version >= 2 { if let Some(node_delta) = serialization_details.node_mutations.get(¤t_node_id) { + let strategy = node_delta.strategy.as_ref().unwrap(); + let mut node_has_update = false; + /* Bitmap: 7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes) @@ -317,51 +320,60 @@ fn serialize_delta(serialization_details: &SerializationSet, s 0: used for odd keys */ - if node_delta.has_address_set_changed { - node_address_update_count += 1; - - let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; - let mut address_serialization = Vec::new(); - - // we don't know a priori how many are <= 255 bytes - let mut total_address_count = 0u8; - - for address in address_set.iter() { - if total_address_count == u8::MAX { - // don't serialize more than 255 addresses - break; + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => { + let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; + let mut address_serialization = Vec::new(); + + // we don't know a priori how many are <= 255 bytes + let mut total_address_count = 0u8; + + for address in address_set.iter() { + if total_address_count == u8::MAX { + // don't serialize more than 255 addresses + break; + } + if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { + total_address_count += 1; + serialized_length.write(&mut address_serialization).unwrap(); + address.write(&mut address_serialization).unwrap(); + }; } - if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { - total_address_count += 1; - serialized_length.write(&mut address_serialization).unwrap(); - address.write(&mut address_serialization).unwrap(); - }; - } - - // signal the presence of node addresses - current_node_delta_serialization[0] |= 1 << 2; - // serialize the actual addresses and count - total_address_count.write(&mut current_node_delta_serialization).unwrap(); - current_node_delta_serialization.append(&mut address_serialization); - } - if node_delta.has_feature_set_changed { - node_feature_update_count += 1; + node_address_update_count += 1; + node_has_update = true; - let latest_features = &node_delta.latest_details.as_ref().unwrap().features; + // signal the presence of node addresses + current_node_delta_serialization[0] |= 1 << 2; + // serialize the actual addresses and count + total_address_count.write(&mut current_node_delta_serialization).unwrap(); + current_node_delta_serialization.append(&mut address_serialization); + }, + _ => {} + } - // are these features among the most common ones? - if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { - // this feature set is among the 6 defaults - current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; - } else { - current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 - latest_features.write(&mut current_node_delta_serialization).unwrap(); - } + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => { + let latest_features = &node_delta.latest_details.as_ref().unwrap().features; + node_feature_update_count += 1; + node_has_update = true; + + // are these features among the most common ones? + if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { + // this feature set is among the 6 defaults + current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; + } else { + current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 + latest_features.write(&mut current_node_delta_serialization).unwrap(); + } + }, + _ => {} } - if node_delta.has_address_set_changed || node_delta.has_feature_set_changed { + if node_has_update { node_update_count += 1; + } else if let NodeSerializationStrategy::Reminder = strategy { + current_node_delta_serialization[0] |= 1 << 6; } } } diff --git a/src/lookup.rs b/src/lookup.rs index 026bdeb..87a35ae 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -17,7 +17,7 @@ use lightning::ln::features::NodeFeatures; use lightning::util::logger::Logger; use crate::config; -use crate::serialization::MutatedProperties; +use crate::serialization::{MutatedNodeProperties, MutatedProperties, NodeSerializationStrategy}; /// The delta set needs to be a BTreeMap so the keys are sorted. /// That way, the scids in the response automatically grow monotonically @@ -58,23 +58,15 @@ pub(super) struct NodeDelta { /// The most recently received, but new-to-the-client, node details pub(super) latest_details: Option, - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of features this node - /// supports changed? - pub(super) has_feature_set_changed: bool, - - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of socket addresses this - /// node listens on changed? - pub(super) has_address_set_changed: bool, + /// How should this delta be serialized? + pub(super) strategy: Option, /// The most recent node details that the client would have seen already pub(super) last_details_before_seen: Option } pub(super) struct NodeDetails { - #[allow(unused)] - pub(super) seen: u32, + pub(super) seen: Option, pub(super) features: NodeFeatures, pub(super) addresses: HashSet } @@ -94,9 +86,8 @@ impl Default for NodeDelta { fn default() -> Self { Self { latest_details: None, - has_feature_set_changed: false, - has_address_set_changed: false, last_details_before_seen: None, + strategy: None, } } } @@ -478,7 +469,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { +pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option, logger: L) -> NodeDeltaSet where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_float = last_sync_timestamp as f64; @@ -487,7 +478,7 @@ pub(super) async fn fetch_node_updates(network_graph: Arc(network_graph: Arc(network_graph: Arc = unsigned_node_announcement.addresses.into_iter().collect(); NodeDetails { - seen, + seen: Some(seen), features: unsigned_node_announcement.features, addresses: address_set, } @@ -550,10 +540,29 @@ pub(super) async fn fetch_node_updates(network_graph: Arc(network_graph: Arc = None; let mut intermediate_update_count = 0; + let mut has_address_set_changed = false; + let mut has_feature_set_changed = false; + let mut latest_mutation_timestamp = None; while let Some(row_res) = pinned_updates.next().await { let intermediate_update = row_res.unwrap(); intermediate_update_count += 1; @@ -578,37 +590,56 @@ pub(super) async fn fetch_node_updates(network_graph: Arc = unsigned_node_announcement.addresses.into_iter().collect(); - // determine mutations + if previous_node_id != Some(node_id) { + // we're traversing a new node id, initialize the values + has_address_set_changed = false; + has_feature_set_changed = false; + latest_mutation_timestamp = None; + + // this is the highest timestamp value, so set the seen timestamp accordingly + current_node_delta.latest_details.as_mut().map(|mut d| d.seen.replace(current_seen_timestamp)); + } + if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() { - if unsigned_node_announcement.features != last_seen_update.features { - current_node_delta.has_feature_set_changed = true; - } - if address_set != last_seen_update.addresses { - current_node_delta.has_address_set_changed = true; - } - } else if !is_previously_processed_node_id { - if current_node_delta.last_details_before_seen.is_none() { - if !address_set.is_empty() { - current_node_delta.has_address_set_changed = true; + { // determine the latest mutation timestamp + if address_set != last_seen_update.addresses { + has_address_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } - if unsigned_node_announcement.features != NodeFeatures::empty() { - current_node_delta.has_feature_set_changed = true; + if unsigned_node_announcement.features != last_seen_update.features { + has_feature_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } } - } - if !is_previously_processed_node_id { - (*current_node_delta).latest_details.get_or_insert(NodeDetails { - seen: current_seen_timestamp, - features: unsigned_node_announcement.features, - addresses: address_set, - }); + if current_seen_timestamp >= last_sync_timestamp { + if has_address_set_changed || has_feature_set_changed { + // if the last mutation occurred since the last sync, send the mutation variant + current_node_delta.strategy = Some(NodeSerializationStrategy::Mutated(MutatedNodeProperties { + addresses: has_address_set_changed, + features: has_feature_set_changed, + })); + } + } else if include_reminders && latest_mutation_timestamp.unwrap_or(u32::MAX) <= reminder_inclusion_threshold_timestamp { + // only send a reminder if the latest mutation occurred at least 6 days ago + current_node_delta.strategy = Some(NodeSerializationStrategy::Reminder); + } + + // Note that we completely ignore the case when the last mutation occurred less than + // 6 days ago, but prior to the last sync. In that scenario, we send nothing. + + } else { + // absent any update that was seen prior to the last sync, send the full version + current_node_delta.strategy = Some(NodeSerializationStrategy::Full); } previous_node_id = Some(node_id); diff --git a/src/serialization.rs b/src/serialization.rs index 74c92ca..2cbde46 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -99,6 +99,22 @@ impl UpdateSerialization { } } +pub(super) struct MutatedNodeProperties { + pub(super) addresses: bool, + pub(super) features: bool, +} + +pub(super) enum NodeSerializationStrategy { + /// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been + Mutated(MutatedNodeProperties), + /// Whether or not the addresses or features have been mutated, serialize this node in full. It + /// may have been purged from the client. + Full, + /// This node ID has been seen recently enough to not have been pruned, and this update serves + /// solely the purpose of delaying any pruning, without applying any mutations + Reminder +} + struct FullUpdateValueHistograms { cltv_expiry_delta: HashMap, htlc_minimum_msat: HashMap, @@ -222,12 +238,18 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| { // either something changed, or this node is new - delta.has_feature_set_changed || delta.has_address_set_changed || delta.last_details_before_seen.is_none() + delta.strategy.is_some() }).collect(); let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default(); for (_id, delta) in serialization_set.node_mutations.iter() { - if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() { + // consider either full or feature-mutating serializations for histogram + let mut should_add_to_histogram = matches!(delta.strategy, Some(NodeSerializationStrategy::Full)); + if let Some(NodeSerializationStrategy::Mutated(mutation)) = delta.strategy.as_ref() { + should_add_to_histogram = mutation.features; + } + + if should_add_to_histogram { if let Some(latest_details) = delta.latest_details.as_ref() { *node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1; }; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 2e0e0c1..9a5e22e 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -347,19 +347,41 @@ async fn test_node_announcement_delta_detection() { let timestamp = current_time() - 10; { // seed the db + + { // necessary for the node announcements to be considered relevant + let announcement = generate_channel_announcement(1); + let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); + let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); + } + let mut announcement = generate_node_announcement(None); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 10))).await.unwrap(); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 8))).await.unwrap(); + announcement.contents.timestamp = timestamp - 10; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); + announcement.contents.timestamp = timestamp - 8; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[2; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]); + current_announcement.contents.timestamp = timestamp; + network_graph_arc.update_node_from_unsigned_announcement(¤t_announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[3; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![22, 49]); + current_announcement.contents.timestamp = timestamp; receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } @@ -379,23 +401,11 @@ async fn test_node_announcement_delta_detection() { version: 3, port: 4, }); + announcement.contents.timestamp = timestamp; } + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(timestamp))).await.unwrap(); - { // necessary for the node announcements to be considered relevant - let announcement = generate_channel_announcement(1); - let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); - let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); - - network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); - network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); - } - drop(receiver); persister.persist_gossip().await; @@ -409,10 +419,10 @@ async fn test_node_announcement_delta_detection() { clean_test_db().await; assert_eq!(serialization.message_count, 3); - assert_eq!(serialization.node_announcement_count, 3); - assert_eq!(serialization.node_update_count, 3); - assert_eq!(serialization.node_feature_update_count, 3); - assert_eq!(serialization.node_address_update_count, 1); + assert_eq!(serialization.node_announcement_count, 2); + assert_eq!(serialization.node_update_count, 2); + assert_eq!(serialization.node_feature_update_count, 2); + assert_eq!(serialization.node_address_update_count, 2); } /// If a channel has only seen updates in one direction, it should not be announced From 1d7837306ec5b23aec792c9c13d70fba16c5253e Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 3 Jul 2024 14:02:05 -0700 Subject: [PATCH 7/9] Send full node announcements following old pre-sync updates. This covers the following part of our serialization logic: If the pre-sync update was more than 6 days ago, serialize in full. --- src/serialization.rs | 17 ++++++++++++++--- src/tests/mod.rs | 6 +++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/serialization.rs b/src/serialization.rs index 2cbde46..decc580 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -236,9 +236,20 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N serialization_set.full_update_defaults = default_update_values; - serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| { - // either something changed, or this node is new - delta.strategy.is_some() + serialization_set.node_mutations = node_delta_set.into_iter().filter_map(|(id, mut delta)| { + if delta.strategy.is_none() { + return None; + } + if let Some(last_details_before_seen) = delta.last_details_before_seen.as_ref() { + if let Some(last_details_seen) = last_details_before_seen.seen { + if last_details_seen <= non_incremental_previous_update_threshold_timestamp { + delta.strategy = Some(NodeSerializationStrategy::Full) + } + } + Some((id, delta)) + } else { + None + } }).collect(); let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default(); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 9a5e22e..3fc37cc 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -420,9 +420,9 @@ async fn test_node_announcement_delta_detection() { assert_eq!(serialization.message_count, 3); assert_eq!(serialization.node_announcement_count, 2); - assert_eq!(serialization.node_update_count, 2); - assert_eq!(serialization.node_feature_update_count, 2); - assert_eq!(serialization.node_address_update_count, 2); + assert_eq!(serialization.node_update_count, 1); + assert_eq!(serialization.node_feature_update_count, 1); + assert_eq!(serialization.node_address_update_count, 1); } /// If a channel has only seen updates in one direction, it should not be announced From a9674f70a4c1f115402f95092e4474d7522e8c6f Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Mon, 16 Sep 2024 12:00:03 -0700 Subject: [PATCH 8/9] Pin tokio-macros to 1.63.0. --- .github/workflows/build.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 33d5239..c1f65df 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,6 +30,11 @@ jobs: toolchain: ${{ matrix.toolchain }} override: true profile: minimal + - name: Pin dependencies + if: ${{ matrix.toolchain == '1.63.0' }} + run: | + cargo update -p tokio --precise "1.37.0" --verbose + cargo update -p tokio-macros --precise "2.2.0" --verbose - name: Build on Rust ${{ matrix.toolchain }} run: | cargo build --verbose --color always From d7a9d6202f8320af5b2a45510f91e2b4ac33df2c Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 18 Sep 2024 10:02:09 -0700 Subject: [PATCH 9/9] Pin postgres-types to 0.2.6. --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c1f65df..e3ad979 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -35,6 +35,7 @@ jobs: run: | cargo update -p tokio --precise "1.37.0" --verbose cargo update -p tokio-macros --precise "2.2.0" --verbose + cargo update -p postgres-types --precise "0.2.6" --verbose - name: Build on Rust ${{ matrix.toolchain }} run: | cargo build --verbose --color always