Skip to content

Commit

Permalink
Merge pull request fedimint#6790 from m1sterc001guy/gateway_v1_crate
Browse files Browse the repository at this point in the history
refactor: move LNv1 module into its own crate
  • Loading branch information
m1sterc001guy authored Feb 4, 2025
2 parents b2693b7 + 8bab787 commit eb2e333
Show file tree
Hide file tree
Showing 19 changed files with 505 additions and 329 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"modules/fedimint-empty-client",
"modules/fedimint-empty-common",
"modules/fedimint-empty-server",
"modules/fedimint-gw-client",
"modules/fedimint-ln-client",
"modules/fedimint-ln-common",
"modules/fedimint-ln-server",
Expand Down Expand Up @@ -136,6 +137,7 @@ fedimint-dummy-common = { path = "./modules/fedimint-dummy-common", version = "=
fedimint-dummy-server = { path = "./modules/fedimint-dummy-server", version = "=0.7.0-alpha" }
fedimint-empty-common = { path = "./modules/fedimint-empty-common", version = "=0.7.0-alpha" }
fedimint-eventlog = { path = "./fedimint-eventlog", version = "=0.7.0-alpha" }
fedimint-gw-client = { path = "./modules/fedimint-gw-client", version = "=0.7.0-alpha" }
fedimint-lightning = { package = "fedimint-lightning", path = "./gateway/fedimint-lightning", version = "=0.7.0-alpha" }
fedimint-lnv2-client = { path = "./modules/fedimint-lnv2-client", version = "=0.7.0-alpha" }
fedimint-lnv2-common = { path = "./modules/fedimint-lnv2-common", version = "=0.7.0-alpha" }
Expand Down
1 change: 1 addition & 0 deletions fedimint-eventlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait = { workspace = true }
fedimint-core = { workspace = true }
fedimint-logging = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["time", "macros", "rt"] }
Expand Down
98 changes: 97 additions & 1 deletion fedimint-eventlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use fedimint_core::db::{
};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record, Amount};
use fedimint_logging::LOG_CLIENT_EVENT_LOG;
use futures::{Future, StreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, watch};
use tracing::{debug, trace};
Expand Down Expand Up @@ -440,6 +441,101 @@ where
}
}

/// Filters the `PersistedLogEntries` by the `EventKind` and
/// `ModuleKind`.
pub fn filter_events_by_kind<'a, I>(
all_events: I,
module_kind: ModuleKind,
event_kind: EventKind,
) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
where
I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
{
all_events.into_iter().filter(move |e| {
if let Some((m, _)) = &e.module {
e.event_kind == event_kind && *m == module_kind
} else {
false
}
})
}

/// Joins two sets of events on a predicate.
///
/// This function computes a "nested loop join" by first computing the cross
/// product of the start event vector and the success/failure event vectors. The
/// resulting cartesian product is then filtered according to the join predicate
/// supplied in the parameters.
///
/// This function is intended for small data sets. If the data set relations
/// grow, this function should implement a different join algorithm or be moved
/// out of the gateway.
pub fn join_events<'a, L, R, Res>(
events_l: &'a [&PersistedLogEntry],
events_r: &'a [&PersistedLogEntry],
predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
) -> impl Iterator<Item = Res> + 'a
where
L: Event,
R: Event,
{
events_l
.iter()
.cartesian_product(events_r)
.filter_map(move |(l, r)| {
if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
let event_l: L =
serde_json::from_value(l.value.clone()).expect("could not parse JSON");
let event_r: R =
serde_json::from_value(r.value.clone()).expect("could not parse JSON");
predicate(event_l, event_r, latency)
} else {
None
}
})
}

/// Helper struct for storing computed data about outgoing and incoming
/// payments.
#[derive(Debug, Default)]
pub struct StructuredPaymentEvents {
pub latencies: Vec<u64>,
pub fees: Vec<Amount>,
pub latencies_failure: Vec<u64>,
}

impl StructuredPaymentEvents {
pub fn new(
success_stats: &[(u64, Amount)],
failure_stats: Vec<u64>,
) -> StructuredPaymentEvents {
let mut events = StructuredPaymentEvents {
latencies: success_stats.iter().map(|(l, _)| *l).collect(),
fees: success_stats.iter().map(|(_, f)| *f).collect(),
latencies_failure: failure_stats,
};
events.sort();
events
}

/// Combines this `StructuredPaymentEvents` with the `other`
/// `StructuredPaymentEvents` by appending all of the internal vectors.
pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
self.latencies.append(&mut other.latencies);
self.fees.append(&mut other.fees);
self.latencies_failure.append(&mut other.latencies_failure);
self.sort();
}

/// Sorts this `StructuredPaymentEvents` by sorting all of the internal
/// vectors.
fn sort(&mut self) {
self.latencies.sort_unstable();
self.fees.sort_unstable();
self.latencies_failure.sort_unstable();
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU8;
Expand Down
2 changes: 1 addition & 1 deletion gateway/fedimint-lightning/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub struct CloseChannelsWithPeerRequest {
pub pubkey: secp256k1::PublicKey,
}

// Trait that specifies how to interact with the gateway's lightning node.
// TODO: Move into `fedimint-gateway-v2` crate
#[async_trait]
pub trait LightningV2Manager: Debug + Send + Sync {
async fn contains_incoming_contract(&self, payment_image: PaymentImage) -> bool;
Expand Down
3 changes: 2 additions & 1 deletion gateway/ln-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fedimint-bip39 = { version = "=0.7.0-alpha", path = "../../fedimint-bip39" }
fedimint-client = { path = "../../fedimint-client", version = "=0.7.0-alpha" }
fedimint-core = { workspace = true }
fedimint-eventlog = { workspace = true }
fedimint-gw-client = { workspace = true }
fedimint-lightning = { path = "../fedimint-lightning", version = "=0.7.0-alpha" }
fedimint-ln-client = { workspace = true }
fedimint-ln-common = { workspace = true }
Expand All @@ -51,7 +52,6 @@ fedimint-wallet-client = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
itertools = { workspace = true }
lightning-invoice = { workspace = true }
lockable = "0.1.1"
prost = "0.13.4"
Expand Down Expand Up @@ -82,6 +82,7 @@ fedimint-lnv2-server = { workspace = true }
fedimint-testing = { workspace = true }
fedimint-unknown-common = { workspace = true }
fedimint-unknown-server = { workspace = true }
itertools = { workspace = true }

[build-dependencies]
fedimint-build = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions gateway/ln-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use fedimint_core::config::FederationId;
use fedimint_core::core::ModuleKind;
use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_gw_client::GatewayClientInit;

use crate::db::{FederationConfig, GatewayDbExt};
use crate::error::AdminGatewayError;
use crate::gateway_module_v2::GatewayClientInitV2;
use crate::state_machine::GatewayClientInit;
use crate::{AdminResult, Gateway};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -72,7 +72,7 @@ impl GatewayClientBuilder {
if gateway.is_running_lnv1() {
registry.attach(GatewayClientInit {
federation_index,
gateway: gateway.clone(),
lightning_manager: gateway.clone(),
});
}

Expand Down
2 changes: 1 addition & 1 deletion gateway/ln-gateway/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use axum::response::{IntoResponse, Response};
use fedimint_core::config::{FederationId, FederationIdPrefix};
use fedimint_core::envs::is_env_var_set;
use fedimint_core::fmt_utils::OptStacktrace;
use fedimint_gw_client::pay::OutgoingPaymentError;
use fedimint_lightning::LightningRpcError;
use reqwest::StatusCode;
use thiserror::Error;
use tracing::error;

use crate::envs::FM_DEBUG_GATEWAY_ENV;
use crate::state_machine::pay::OutgoingPaymentError;

/// Errors that unauthenticated endpoints can encounter. For privacy reasons,
/// the error messages are intended to be redacted before returning to the
Expand Down
113 changes: 1 addition & 112 deletions gateway/ln-gateway/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};

use fedimint_client::ClientHandle;
use fedimint_core::core::ModuleKind;
use fedimint_core::util::{get_average, get_median};
use fedimint_core::Amount;
use fedimint_eventlog::{
DBTransactionEventLogExt, Event, EventKind, EventLogId, PersistedLogEntry,
};
use fedimint_mint_client::event::{OOBNotesReissued, OOBNotesSpent};
use fedimint_wallet_client::events::{DepositConfirmed, WithdrawRequest};
use itertools::Itertools;

use crate::gateway_module_v2::events::{
CompleteLightningPaymentSucceeded, IncomingPaymentFailed, IncomingPaymentStarted,
IncomingPaymentSucceeded, OutgoingPaymentFailed, OutgoingPaymentStarted,
OutgoingPaymentSucceeded,
};
use crate::rpc::PaymentStats;

pub const ALL_GATEWAY_EVENTS: [EventKind; 11] = [
OutgoingPaymentStarted::KIND,
Expand Down Expand Up @@ -108,109 +103,3 @@ pub async fn get_events_for_duration(
batch_start = batch_start.saturating_add(BATCH_SIZE);
}
}

/// Filters the given `PersistedLogEntry` slice by the `EventKind` and
/// `ModuleKind`.
pub(crate) fn filter_events<'a, I>(
all_events: I,
event_kind: EventKind,
module_kind: ModuleKind,
) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
where
I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
{
all_events.into_iter().filter(move |e| {
if let Some((m, _)) = &e.module {
e.event_kind == event_kind && *m == module_kind
} else {
false
}
})
}

/// Joins two sets of events on a predicate.
///
/// This function computes a "nested loop join" by first computing the cross
/// product of the start event vector and the success/failure event vectors. The
/// resulting cartesian product is then filtered according to the join predicate
/// supplied in the parameters.
///
/// This function is intended for small data sets. If the data set relations
/// grow, this function should implement a different join algorithm or be moved
/// out of the gateway.
pub(crate) fn join_events<'a, L, R, Res>(
events_l: &'a [&PersistedLogEntry],
events_r: &'a [&PersistedLogEntry],
predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
) -> impl Iterator<Item = Res> + 'a
where
L: Event,
R: Event,
{
events_l
.iter()
.cartesian_product(events_r)
.filter_map(move |(l, r)| {
if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
let event_l: L =
serde_json::from_value(l.value.clone()).expect("could not parse JSON");
let event_r: R =
serde_json::from_value(r.value.clone()).expect("could not parse JSON");
predicate(event_l, event_r, latency)
} else {
None
}
})
}

/// Helper struct for storing computed data about outgoing and incoming
/// payments.
#[derive(Debug, Default)]
pub struct StructuredPaymentEvents {
latencies: Vec<u64>,
fees: Vec<Amount>,
latencies_failure: Vec<u64>,
}

impl StructuredPaymentEvents {
pub fn new(
success_stats: &[(u64, Amount)],
failure_stats: Vec<u64>,
) -> StructuredPaymentEvents {
let mut events = StructuredPaymentEvents {
latencies: success_stats.iter().map(|(l, _)| *l).collect(),
fees: success_stats.iter().map(|(_, f)| *f).collect(),
latencies_failure: failure_stats,
};
events.sort();
events
}

/// Combines this `StructuredPaymentEvents` with the `other`
/// `StructuredPaymentEvents` by appending all of the internal vectors.
pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
self.latencies.append(&mut other.latencies);
self.fees.append(&mut other.fees);
self.latencies_failure.append(&mut other.latencies_failure);
self.sort();
}

/// Sorts this `StructuredPaymentEvents` by sorting all of the internal
/// vectors.
fn sort(&mut self) {
self.latencies.sort_unstable();
self.fees.sort_unstable();
self.latencies_failure.sort_unstable();
}

/// Computes the payment statistics for the given input data.
pub fn compute_payment_stats(&self) -> PaymentStats {
PaymentStats {
average_latency: get_average(&self.latencies).map(Duration::from_micros),
median_latency: get_median(&self.latencies).map(Duration::from_micros),
total_fees: Amount::from_msats(self.fees.iter().map(|a| a.msats).sum()),
total_success: self.latencies.len(),
total_failure: self.latencies_failure.len(),
}
}
}
Loading

0 comments on commit eb2e333

Please sign in to comment.