Skip to content

Commit

Permalink
Merge pull request #63 from arik-so/2023/10/ordering_fix
Browse files Browse the repository at this point in the history
Fix update recency issue with expanded test coverage
  • Loading branch information
TheBlueMatt authored Oct 19, 2023
2 parents ff9194a + 0aff71f commit 86ebd80
Show file tree
Hide file tree
Showing 8 changed files with 787 additions and 150 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tokio-postgres = { version = "=0.7.5" }
futures = "0.3"

[dev-dependencies]
lightning = { version = "0.0.117", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.117" }

[profile.dev]
Expand Down
10 changes: 8 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lightning::util::ser::Readable;
use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;

pub(crate) const SCHEMA_VERSION: i32 = 12;
pub(crate) const SCHEMA_VERSION: i32 = 13;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
Expand Down Expand Up @@ -143,7 +143,7 @@ pub(crate) fn db_index_creation_query() -> &'static str {
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
CREATE INDEX IF NOT EXISTS channel_updates_timestamp_desc ON channel_updates(timestamp DESC);
CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC);
"
}

Expand Down Expand Up @@ -282,6 +282,12 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
tx.execute("UPDATE config SET db_schema = 12 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema >= 1 && schema <= 12 {
let tx = client.transaction().await.unwrap();
tx.execute("DROP INDEX IF EXISTS channel_updates_timestamp_desc", &[]).await.unwrap();
tx.execute("UPDATE config SET db_schema = 13 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
counter.channel_announcements += 1;
}

let gossip_message = GossipMessage::ChannelAnnouncement(msg);
let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
Expand All @@ -73,7 +73,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {

fn new_channel_update(&self, msg: ChannelUpdate) {
self.counter.write().unwrap().channel_updates += 1;
let gossip_message = GossipMessage::ChannelUpdate(msg);
let gossip_message = GossipMessage::ChannelUpdate(msg, None);

if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
Expand Down
2 changes: 1 addition & 1 deletion src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM channel_updates
WHERE seen >= TO_TIMESTAMP($1)
ORDER BY timestamp DESC
ORDER BY short_channel_id ASC, timestamp DESC
", [last_sync_timestamp_float]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
Expand Down
34 changes: 29 additions & 5 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
}

match &gossip_message {
GossipMessage::ChannelAnnouncement(announcement) => {
GossipMessage::ChannelAnnouncement(announcement, _) => {
let scid = announcement.contents.short_channel_id as i64;

// start with the type prefix, which is already known a priori
Expand All @@ -127,7 +127,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
&announcement_signed
])).await.unwrap().unwrap();
}
GossipMessage::ChannelUpdate(update) => {
GossipMessage::ChannelUpdate(update, seen_override) => {
let scid = update.contents.short_channel_id as i64;

let timestamp = update.contents.timestamp as i64;
Expand All @@ -146,10 +146,11 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut update_signed = Vec::new();
update.write(&mut update_signed).unwrap();

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_updates (\
let insertion_statement = if cfg!(test) {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
seen, \
channel_flags, \
direction, \
disable, \
Expand All @@ -159,9 +160,32 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[
) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING"
} else {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
channel_flags, \
direction, \
disable, \
cltv_expiry_delta, \
htlc_minimum_msat, \
fee_base_msat, \
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING"
};

// this may not be used outside test cfg
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
&timestamp,
#[cfg(test)]
&_seen_timestamp,
&(update.contents.flags as i16),
&direction,
&disable,
Expand Down
Loading

0 comments on commit 86ebd80

Please sign in to comment.