Skip to content

Commit

Permalink
Move pending_offers_message to flows.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
shaavan committed Dec 17, 2024
1 parent 309e72c commit d6ee2da
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 86 deletions.
69 changes: 8 additions & 61 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,10 @@ use crate::ln::outbound_payment;
use crate::ln::outbound_payment::{OutboundPayments, PendingOutboundPayment, RetryableInvoiceRequest, SendAlongPathArgs, StaleExpiration};
use crate::offers::invoice::Bolt12Invoice;
use crate::offers::invoice::UnsignedBolt12Invoice;
use crate::offers::invoice_request::InvoiceRequest;
use crate::offers::nonce::Nonce;
use crate::offers::parse::Bolt12SemanticError;
use crate::offers::signer;
#[cfg(async_payments)]
use crate::offers::static_invoice::StaticInvoice;
use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
use crate::onion_message::messenger::{DefaultMessageRouter, Destination, MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
use crate::onion_message::offers::OffersMessage;
use crate::onion_message::messenger::{MessageRouter, MessageSendInstructions, Responder, ResponseInstruction};
use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
use crate::sign::ecdsa::EcdsaChannelSigner;
use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate};
Expand All @@ -90,8 +85,15 @@ use crate::util::errors::APIError;
#[cfg(feature = "dnssec")]
use crate::onion_message::dns_resolution::{DNSResolverMessage, OMNameResolver};

#[cfg(async_payments)]
use {
crate::offers::static_invoice::StaticInvoice,
crate::onion_message::messenger::Destination,
};

#[cfg(not(c_bindings))]
use {
crate::onion_message::messenger::DefaultMessageRouter,
crate::routing::router::DefaultRouter,
crate::routing::gossip::NetworkGraph,
crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters},
Expand Down Expand Up @@ -2140,8 +2142,6 @@ where
//
// Lock order tree:
//
// `pending_offers_messages`
//
// `pending_async_payments_messages`
//
// `total_consistency_lock`
Expand Down Expand Up @@ -2392,10 +2392,6 @@ where
event_persist_notifier: Notifier,
needs_persist_flag: AtomicBool,

#[cfg(not(any(test, feature = "_test_utils")))]
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
#[cfg(any(test, feature = "_test_utils"))]
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
pending_async_payments_messages: Mutex<Vec<(AsyncPaymentsMessage, MessageSendInstructions)>>,

/// Tracks the message events that are to be broadcasted when we are connected to some peer.
Expand Down Expand Up @@ -3315,7 +3311,6 @@ where
needs_persist_flag: AtomicBool::new(false),
funding_batch_states: Mutex::new(BTreeMap::new()),

pending_offers_messages: Mutex::new(Vec::new()),
pending_async_payments_messages: Mutex::new(Vec::new()),
pending_broadcast_messages: Mutex::new(Vec::new()),

Expand Down Expand Up @@ -9516,10 +9511,6 @@ where
MR::Target: MessageRouter,
L::Target: Logger,
{
fn get_pending_offers_messages(&self) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>> {
self.pending_offers_messages.lock().expect("Mutex is locked by other thread.")
}

#[cfg(feature = "dnssec")]
fn get_pending_dns_onion_messages(&self) -> MutexGuard<'_, Vec<(DNSResolverMessage, MessageSendInstructions)>> {
self.pending_dns_onion_messages.lock().expect("Mutex is locked by other thread.")
Expand Down Expand Up @@ -9629,42 +9620,6 @@ where
self.pending_outbound_payments.release_invoice_requests_awaiting_invoice()
}

fn enqueue_invoice_request(
&self,
invoice_request: InvoiceRequest,
reply_paths: Vec<BlindedMessagePath>,
) -> Result<(), Bolt12SemanticError> {
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if !invoice_request.paths().is_empty() {
reply_paths
.iter()
.flat_map(|reply_path| invoice_request.paths().iter().map(move |path| (path, reply_path)))
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
.for_each(|(path, reply_path)| {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::BlindedPath(path.clone()),
reply_path: reply_path.clone(),
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
});
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
for reply_path in reply_paths {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::Node(node_id),
reply_path,
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
}
} else {
debug_assert!(false);
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
}

Ok(())
}

fn get_current_blocktime(&self) -> Duration {
Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64)
}
Expand Down Expand Up @@ -9765,13 +9720,6 @@ where
}
}

/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
/// along different paths.
/// Sending multiple requests increases the chances of successful delivery in case some
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
/// even if multiple invoices are received.
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;

impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
where
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
Expand Down Expand Up @@ -13165,7 +13113,6 @@ where

funding_batch_states: Mutex::new(BTreeMap::new()),

pending_offers_messages: Mutex::new(Vec::new()),
pending_async_payments_messages: Mutex::new(Vec::new()),

pending_broadcast_messages: Mutex::new(Vec::new()),
Expand Down
14 changes: 7 additions & 7 deletions lightning/src/ln/offers_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ fn fails_authentication_when_handling_invoice_request() {
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);

connect_peers(david, alice);
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::Node(alice_id),
_ => panic!(),
Expand All @@ -1350,7 +1350,7 @@ fn fails_authentication_when_handling_invoice_request() {
.unwrap();
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);

match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut david.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::BlindedPath(invalid_path),
_ => panic!(),
Expand Down Expand Up @@ -1430,7 +1430,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {

// Don't send the invoice request, but grab its reply path to use with a different request.
let invalid_reply_path = {
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
let pending_invoice_request = pending_offers_messages.pop().unwrap();
pending_offers_messages.clear();
match pending_invoice_request.1 {
Expand All @@ -1447,7 +1447,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
// Swap out the reply path to force authentication to fail when handling the invoice since it
// will be sent over the wrong blinded path.
{
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
let mut pending_offers_messages = david.offers_handler.pending_offers_messages.lock().unwrap();
let mut pending_invoice_request = pending_offers_messages.first_mut().unwrap();
match &mut pending_invoice_request.1 {
MessageSendInstructions::WithSpecifiedReplyPath { reply_path, .. } =>
Expand Down Expand Up @@ -1534,7 +1534,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();

connect_peers(david, alice);
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::Node(david_id),
_ => panic!(),
Expand Down Expand Up @@ -1565,7 +1565,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {

let expected_invoice = alice.offers_handler.request_refund_payment(&refund).unwrap();

match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut alice.offers_handler.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::BlindedPath(invalid_path),
_ => panic!(),
Expand Down Expand Up @@ -2156,7 +2156,7 @@ fn fails_paying_invoice_with_unknown_required_features() {
destination: Destination::BlindedPath(reply_path),
};
let message = OffersMessage::Invoice(invoice);
alice.node.pending_offers_messages.lock().unwrap().push((message, instructions));
alice.offers_handler.pending_offers_messages.lock().unwrap().push((message, instructions));

let onion_message = alice.onion_messenger.next_onion_message_for_peer(charlie_id).unwrap();
charlie.onion_messenger.handle_onion_message(alice_id, &onion_message);
Expand Down
76 changes: 58 additions & 18 deletions lightning/src/offers/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use crate::blinded_path::payment::{
BlindedPaymentPath, Bolt12OfferContext, Bolt12RefundContext, PaymentContext,
};
use crate::events::PaymentFailureReason;
use crate::ln::channelmanager::{
Bolt12PaymentError, PaymentId, Verification, OFFERS_MESSAGE_REQUEST_LIMIT,
};
use crate::ln::channelmanager::{Bolt12PaymentError, PaymentId, Verification};
use crate::ln::inbound_payment;
use crate::ln::outbound_payment::{Retry, RetryableInvoiceRequest, StaleExpiration};
use crate::offers::invoice::{
Expand Down Expand Up @@ -77,11 +75,6 @@ use {
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
pub trait OffersMessageCommons {
/// Get pending offers messages
fn get_pending_offers_messages(
&self,
) -> MutexGuard<'_, Vec<(OffersMessage, MessageSendInstructions)>>;

#[cfg(feature = "dnssec")]
/// Get pending DNS onion messages
fn get_pending_dns_onion_messages(
Expand Down Expand Up @@ -172,11 +165,6 @@ pub trait OffersMessageCommons {
&self,
) -> Vec<(PaymentId, RetryableInvoiceRequest)>;

/// Enqueue invoice request
fn enqueue_invoice_request(
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
) -> Result<(), Bolt12SemanticError>;

/// Get the current time determined by highest seen timestamp
fn get_current_blocktime(&self) -> Duration;

Expand Down Expand Up @@ -577,6 +565,11 @@ where

message_router: MR,

#[cfg(not(any(test, feature = "_test_utils")))]
pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
#[cfg(any(test, feature = "_test_utils"))]
pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,

#[cfg(feature = "_test_utils")]
/// In testing, it is useful be able to forge a name -> offer mapping so that we can pay an
/// offer generated in the test.
Expand Down Expand Up @@ -609,9 +602,13 @@ where
inbound_payment_key: expanded_inbound_key,
our_network_pubkey,
secp_ctx,
entropy_source,

commons,

message_router,
entropy_source,

pending_offers_messages: Mutex::new(Vec::new()),
#[cfg(feature = "_test_utils")]
testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
logger,
Expand Down Expand Up @@ -644,6 +641,13 @@ where
/// [`Refund`]: crate::offers::refund
pub const MAX_SHORT_LIVED_RELATIVE_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24);

/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
/// along different paths.
/// Sending multiple requests increases the chances of successful delivery in case some
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
/// even if multiple invoices are received.
pub const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;

impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
where
ES::Target: EntropySource,
Expand Down Expand Up @@ -722,6 +726,42 @@ where
)
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
}

fn enqueue_invoice_request(
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
) -> Result<(), Bolt12SemanticError> {
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if !invoice_request.paths().is_empty() {
reply_paths
.iter()
.flat_map(|reply_path| {
invoice_request.paths().iter().map(move |path| (path, reply_path))
})
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
.for_each(|(path, reply_path)| {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::BlindedPath(path.clone()),
reply_path: reply_path.clone(),
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
});
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
for reply_path in reply_paths {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::Node(node_id),
reply_path,
};
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
pending_offers_messages.push((message, instructions));
}
} else {
debug_assert!(false);
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
}

Ok(())
}
}

impl<ES: Deref, OMC: Deref, MR: Deref, L: Deref> OffersMessageFlow<ES, OMC, MR, L>
Expand Down Expand Up @@ -776,7 +816,7 @@ where

create_pending_payment(&invoice_request, nonce)?;

self.commons.enqueue_invoice_request(invoice_request, reply_paths)
self.enqueue_invoice_request(invoice_request, reply_paths)
}
}

Expand Down Expand Up @@ -1044,7 +1084,7 @@ where
});
match self.create_blinded_paths(context) {
Ok(reply_paths) => {
match self.commons.enqueue_invoice_request(invoice_request, reply_paths) {
match self.enqueue_invoice_request(invoice_request, reply_paths) {
Ok(_) => {},
Err(_) => {
log_warn!(
Expand All @@ -1068,7 +1108,7 @@ where
}

fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
core::mem::take(&mut self.commons.get_pending_offers_messages())
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
}
}

Expand Down Expand Up @@ -1398,7 +1438,7 @@ where
.create_blinded_paths(context)
.map_err(|_| Bolt12SemanticError::MissingPaths)?;

let mut pending_offers_messages = self.commons.get_pending_offers_messages();
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if refund.paths().is_empty() {
for reply_path in reply_paths {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
Expand Down

0 comments on commit d6ee2da

Please sign in to comment.