Skip to content

Commit de24b92

Browse files
committed
Implement Flow for OffersMessageFlow
1 parent 498afb8 commit de24b92

File tree

3 files changed

+288
-7
lines changed

3 files changed

+288
-7
lines changed

lightning/src/ln/channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1807,7 +1807,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
18071807
/// Either the height at which this channel was created or the height at which it was last
18081808
/// serialized if it was serialized by versions prior to 0.0.103.
18091809
/// We use this to close if funding is never broadcasted.
1810-
pub(super) channel_creation_height: u32,
1810+
pub(crate) channel_creation_height: u32,
18111811

18121812
counterparty_dust_limit_satoshis: u64,
18131813

lightning/src/ln/channelmanager.rs

+22-5
Original file line numberDiff line numberDiff line change
@@ -1349,11 +1349,11 @@ impl Readable for Option<RAAMonitorUpdateBlockingAction> {
13491349
}
13501350

13511351
/// State we hold per-peer.
1352-
pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
1352+
pub(crate) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13531353
/// `channel_id` -> `Channel`
13541354
///
13551355
/// Holds all channels where the peer is the counterparty.
1356-
pub(super) channel_by_id: HashMap<ChannelId, Channel<SP>>,
1356+
pub(crate) channel_by_id: HashMap<ChannelId, Channel<SP>>,
13571357
/// `temporary_channel_id` -> `InboundChannelRequest`.
13581358
///
13591359
/// When manual channel acceptance is enabled, this holds all unaccepted inbound channels where
@@ -1362,7 +1362,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13621362
/// the channel is rejected, then the entry is simply removed.
13631363
pub(super) inbound_channel_request_by_id: HashMap<ChannelId, InboundChannelRequest>,
13641364
/// The latest `InitFeatures` we heard from the peer.
1365-
latest_features: InitFeatures,
1365+
pub(crate) latest_features: InitFeatures,
13661366
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
13671367
/// for broadcast messages, where ordering isn't as strict).
13681368
pub(super) pending_msg_events: Vec<MessageSendEvent>,
@@ -8035,7 +8035,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
80358035

80368036
let per_peer_state = self.per_peer_state.read().unwrap();
80378037
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
8038-
.ok_or_else(|| {
8038+
.ok_or_else(|| {
80398039
debug_assert!(false);
80408040
MsgHandleErrInternal::send_err_msg_no_close(
80418041
format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id),
@@ -10202,7 +10202,7 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
1020210202
/// Sending multiple requests increases the chances of successful delivery in case some
1020310203
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
1020410204
/// even if multiple invoices are received.
10205-
const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
10205+
pub(crate) const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
1020610206

1020710207
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
1020810208
where
@@ -10812,6 +10812,23 @@ where
1081210812
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
1081310813
}
1081410814

10815+
fn peer_for_blinded_path(&self) -> Vec<MessageForwardNode> {
10816+
self.per_peer_state.read().unwrap()
10817+
.iter()
10818+
.map(|(node_id, peer_state)| (node_id, peer_state.lock().unwrap()))
10819+
.filter(|(_, peer)| peer.is_connected)
10820+
.filter(|(_, peer)| peer.latest_features.supports_onion_messages())
10821+
.map(|(node_id, peer)| MessageForwardNode {
10822+
node_id: *node_id,
10823+
short_channel_id: peer.channel_by_id
10824+
.iter()
10825+
.filter(|(_, channel)| channel.context().is_usable())
10826+
.min_by_key(|(_, channel)| channel.context().channel_creation_height)
10827+
.and_then(|(_, channel)| channel.context().get_short_channel_id()),
10828+
})
10829+
.collect::<Vec<_>>()
10830+
}
10831+
1081510832
/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
1081610833
/// [`Router::create_blinded_payment_paths`].
1081710834
fn create_blinded_payment_paths(

lightning/src/offers/flow.rs

+265-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use crate::blinded_path::message::{
1616
use crate::blinded_path::payment::BlindedPaymentPath;
1717
use crate::chain;
1818
use crate::chain::transaction::TransactionData;
19-
use crate::ln::channelmanager::PaymentId;
19+
use crate::ln::channelmanager::{
20+
PaymentId, MAX_SHORT_LIVED_RELATIVE_EXPIRY, OFFERS_MESSAGE_REQUEST_LIMIT,
21+
};
2022
use crate::ln::inbound_payment;
2123
use crate::offers::invoice::{Bolt12Invoice, DerivedSigningPubkey, InvoiceBuilder};
2224
use crate::offers::invoice_request::{InvoiceRequest, InvoiceRequestBuilder};
@@ -167,6 +169,17 @@ where
167169
self.our_network_pubkey
168170
}
169171

172+
fn duration_since_epoch(&self) -> Duration {
173+
#[cfg(not(feature = "std"))]
174+
let now = Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64);
175+
#[cfg(feature = "std")]
176+
let now = std::time::SystemTime::now()
177+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
178+
.expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH");
179+
180+
now
181+
}
182+
170183
fn best_block_updated(&self, header: &Header) {
171184
macro_rules! max_time {
172185
($timestamp: expr) => {
@@ -189,6 +202,257 @@ where
189202
}
190203
}
191204

205+
impl<ES: Deref, MR: Deref> Flow for OffersMessageFlow<ES, MR>
206+
where
207+
ES::Target: EntropySource,
208+
MR::Target: MessageRouter,
209+
{
210+
fn create_offer_builder(
211+
&self, nonce: Nonce,
212+
) -> Result<OfferBuilder<DerivedMetadata, secp256k1::All>, Bolt12SemanticError> {
213+
let node_id = self.get_our_node_id();
214+
let expanded_key = &self.inbound_payment_key;
215+
let secp_ctx = &self.secp_ctx;
216+
217+
let builder = OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
218+
.chain_hash(self.chain_hash);
219+
220+
Ok(builder)
221+
}
222+
223+
fn create_refund_builder(
224+
&self, amount_msats: u64, absolute_expiry: Duration, payment_id: PaymentId, nonce: Nonce,
225+
) -> Result<RefundBuilder<secp256k1::All>, Bolt12SemanticError> {
226+
let node_id = self.get_our_node_id();
227+
let expanded_key = &self.inbound_payment_key;
228+
let secp_ctx = &self.secp_ctx;
229+
230+
let builder = RefundBuilder::deriving_signing_pubkey(
231+
node_id,
232+
expanded_key,
233+
nonce,
234+
secp_ctx,
235+
amount_msats,
236+
payment_id,
237+
)?
238+
.chain_hash(self.chain_hash)
239+
.absolute_expiry(absolute_expiry);
240+
241+
Ok(builder)
242+
}
243+
244+
fn create_invoice_request_builder<'a>(
245+
&'a self, offer: &'a Offer, nonce: Nonce, quantity: Option<u64>, amount_msats: Option<u64>,
246+
payer_note: Option<String>, human_readable_name: Option<HumanReadableName>,
247+
payment_id: PaymentId,
248+
) -> Result<InvoiceRequestBuilder<'a, 'a, secp256k1::All>, Bolt12SemanticError> {
249+
let expanded_key = &self.inbound_payment_key;
250+
let secp_ctx = &self.secp_ctx;
251+
252+
let builder = offer
253+
.request_invoice(expanded_key, nonce, secp_ctx, payment_id)?
254+
.chain_hash(self.chain_hash)?;
255+
256+
let builder = match quantity {
257+
None => builder,
258+
Some(quantity) => builder.quantity(quantity)?,
259+
};
260+
let builder = match amount_msats {
261+
None => builder,
262+
Some(amount_msats) => builder.amount_msats(amount_msats)?,
263+
};
264+
let builder = match payer_note {
265+
None => builder,
266+
Some(payer_note) => builder.payer_note(payer_note),
267+
};
268+
let builder = match human_readable_name {
269+
None => builder,
270+
Some(hrn) => builder.sourced_from_human_readable_name(hrn),
271+
};
272+
273+
Ok(builder.into())
274+
}
275+
276+
fn create_invoice_builder<'a>(
277+
&'a self, refund: &'a Refund, payment_paths: Vec<BlindedPaymentPath>,
278+
payment_hash: PaymentHash,
279+
) -> Result<InvoiceBuilder<'a, DerivedSigningPubkey>, Bolt12SemanticError> {
280+
let expanded_key = &self.inbound_payment_key;
281+
let entropy = &*self.entropy_source;
282+
283+
#[cfg(feature = "std")]
284+
let builder = refund.respond_using_derived_keys(
285+
payment_paths,
286+
payment_hash,
287+
expanded_key,
288+
entropy,
289+
)?;
290+
#[cfg(not(feature = "std"))]
291+
let created_at = Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64);
292+
#[cfg(not(feature = "std"))]
293+
let builder = refund.respond_using_derived_keys_no_std(
294+
payment_paths,
295+
payment_hash,
296+
created_at,
297+
expanded_key,
298+
entropy,
299+
)?;
300+
let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
301+
302+
Ok(builder)
303+
}
304+
305+
fn create_blinded_paths(
306+
&self, peers: Vec<MessageForwardNode>, context: MessageContext,
307+
) -> Result<Vec<BlindedMessagePath>, ()> {
308+
let recipient = self.get_our_node_id();
309+
let secp_ctx = &self.secp_ctx;
310+
311+
let peers = peers.iter().map(|node| node.node_id).collect::<Vec<_>>();
312+
313+
self.message_router
314+
.create_blinded_paths(recipient, context, peers, secp_ctx)
315+
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
316+
}
317+
318+
fn create_compact_blinded_paths(
319+
&self, peers: Vec<MessageForwardNode>, context: OffersContext,
320+
) -> Result<Vec<BlindedMessagePath>, ()> {
321+
let recipient = self.get_our_node_id();
322+
let secp_ctx = &self.secp_ctx;
323+
324+
self.message_router
325+
.create_compact_blinded_paths(
326+
recipient,
327+
MessageContext::Offers(context),
328+
peers,
329+
secp_ctx,
330+
)
331+
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
332+
}
333+
334+
fn create_blinded_paths_using_absolute_expiry(
335+
&self, peers: Vec<MessageForwardNode>, context: OffersContext,
336+
absolute_expiry: Option<Duration>,
337+
) -> Result<Vec<BlindedMessagePath>, ()> {
338+
let now = self.duration_since_epoch();
339+
let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
340+
341+
if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
342+
self.create_compact_blinded_paths(peers, context)
343+
} else {
344+
self.create_blinded_paths(peers, MessageContext::Offers(context))
345+
}
346+
}
347+
348+
fn enqueue_invoice_request(
349+
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>,
350+
) -> Result<(), Bolt12SemanticError> {
351+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
352+
if !invoice_request.paths().is_empty() {
353+
reply_paths
354+
.iter()
355+
.flat_map(|reply_path| {
356+
invoice_request.paths().iter().map(move |path| (path, reply_path))
357+
})
358+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
359+
.for_each(|(path, reply_path)| {
360+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
361+
destination: Destination::BlindedPath(path.clone()),
362+
reply_path: reply_path.clone(),
363+
};
364+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
365+
pending_offers_messages.push((message, instructions));
366+
});
367+
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
368+
for reply_path in reply_paths {
369+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
370+
destination: Destination::Node(node_id),
371+
reply_path,
372+
};
373+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
374+
pending_offers_messages.push((message, instructions));
375+
}
376+
} else {
377+
debug_assert!(false);
378+
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
379+
}
380+
381+
Ok(())
382+
}
383+
384+
fn enqueue_invoice(
385+
&self, invoice: Bolt12Invoice, refund: &Refund, reply_paths: Vec<BlindedMessagePath>,
386+
) -> Result<(), Bolt12SemanticError> {
387+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
388+
if refund.paths().is_empty() {
389+
for reply_path in reply_paths {
390+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
391+
destination: Destination::Node(refund.payer_signing_pubkey()),
392+
reply_path,
393+
};
394+
let message = OffersMessage::Invoice(invoice.clone());
395+
pending_offers_messages.push((message, instructions));
396+
}
397+
} else {
398+
reply_paths
399+
.iter()
400+
.flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
401+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
402+
.for_each(|(path, reply_path)| {
403+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
404+
destination: Destination::BlindedPath(path.clone()),
405+
reply_path: reply_path.clone(),
406+
};
407+
let message = OffersMessage::Invoice(invoice.clone());
408+
pending_offers_messages.push((message, instructions));
409+
});
410+
}
411+
412+
Ok(())
413+
}
414+
415+
fn enqueue_dns_onion_message(
416+
&self, message: DNSSECQuery, dns_resolvers: Vec<Destination>,
417+
reply_paths: Vec<BlindedMessagePath>,
418+
) -> Result<(), Bolt12SemanticError> {
419+
let message_params = dns_resolvers
420+
.iter()
421+
.flat_map(|destination| reply_paths.iter().map(move |path| (path, destination)))
422+
.take(OFFERS_MESSAGE_REQUEST_LIMIT);
423+
for (reply_path, destination) in message_params {
424+
self.pending_dns_onion_messages.lock().unwrap().push((
425+
DNSResolverMessage::DNSSECQuery(message.clone()),
426+
MessageSendInstructions::WithSpecifiedReplyPath {
427+
destination: destination.clone(),
428+
reply_path: reply_path.clone(),
429+
},
430+
));
431+
}
432+
433+
Ok(())
434+
}
435+
436+
fn get_and_clear_pending_offers_messages(
437+
&self,
438+
) -> Vec<(OffersMessage, MessageSendInstructions)> {
439+
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
440+
}
441+
442+
fn get_and_clear_pending_async_messages(
443+
&self,
444+
) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
445+
core::mem::take(&mut self.pending_async_payments_messages.lock().unwrap())
446+
}
447+
448+
#[cfg(feature = "dnssec")]
449+
fn get_and_clear_pending_dns_messages(
450+
&self,
451+
) -> Vec<(DNSResolverMessage, MessageSendInstructions)> {
452+
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
453+
}
454+
}
455+
192456
impl<ES: Deref, MR: Deref> chain::Listen for OffersMessageFlow<ES, MR>
193457
where
194458
ES::Target: EntropySource,

0 commit comments

Comments
 (0)