Skip to content

Commit a9812ea

Browse files
committed
Add async api
1 parent ac3d697 commit a9812ea

File tree

5 files changed

+275
-62
lines changed

5 files changed

+275
-62
lines changed

lightning-background-processor/src/lib.rs

+27-16
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39-
use lightning::sign::{ChangeDestinationSource, OutputSpender};
39+
use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender};
4040
use lightning::util::logger::Logger;
4141
use lightning::util::persist::{KVStore, Persister};
42-
use lightning::util::sweep::OutputSweeper;
42+
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
4343
#[cfg(feature = "std")]
4444
use lightning::util::wakers::Sleeper;
4545
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -313,7 +313,7 @@ macro_rules! define_run_body {
313313
$channel_manager: ident, $process_channel_manager_events: expr,
314314
$onion_messenger: ident, $process_onion_message_handler_events: expr,
315315
$peer_manager: ident, $gossip_sync: ident,
316-
$sweeper: ident,
316+
$sweeper: expr,
317317
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
318318
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
319319
) => { {
@@ -473,7 +473,7 @@ macro_rules! define_run_body {
473473

474474
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
475475
log_trace!($logger, "Regenerate sweeper spends if necessary");
476-
let _ = $sweeper.regenerate_and_broadcast_spend_if_necessary_locked();
476+
let _ = $sweeper;
477477
last_sweeper_call = $get_timer(SWEEPER_TIMER);
478478
}
479479
}
@@ -602,7 +602,7 @@ pub(crate) mod futures_util {
602602
}
603603
}
604604
#[cfg(feature = "futures")]
605-
use core::task;
605+
use core::{task, future::Future};
606606
#[cfg(feature = "futures")]
607607
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
608608

@@ -757,6 +757,10 @@ pub async fn process_events_async<
757757
+ Sync,
758758
CM: 'static + Deref + Send + Sync,
759759
OM: 'static + Deref + Send + Sync,
760+
D: 'static + Deref + Send + Sync,
761+
O: 'static + Deref + Send + Sync,
762+
K: 'static + Deref + Send + Sync,
763+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
760764
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
761765
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
762766
PM: 'static + Deref + Send + Sync,
@@ -768,12 +772,13 @@ pub async fn process_events_async<
768772
>(
769773
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
770774
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
775+
sweeper: OS,
771776
logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
772777
fetch_time: FetchTime,
773778
) -> Result<(), lightning::io::Error>
774779
where
775780
UL::Target: 'static + UtxoLookup,
776-
CF::Target: 'static + chain::Filter,
781+
CF::Target: 'static + chain::Filter + Sync + Send,
777782
T::Target: 'static + BroadcasterInterface,
778783
F::Target: 'static + FeeEstimator,
779784
L::Target: 'static + Logger,
@@ -782,6 +787,9 @@ where
782787
CM::Target: AChannelManager + Send + Sync,
783788
OM::Target: AOnionMessenger + Send + Sync,
784789
PM::Target: APeerManager + Send + Sync,
790+
O::Target: 'static + OutputSpender + Send + Sync,
791+
D::Target: 'static + ChangeDestinationSource + Send + Sync,
792+
K::Target: 'static + KVStore + Send + Sync,
785793
{
786794
let mut should_break = false;
787795
let async_event_handler = |event| {
@@ -825,6 +833,7 @@ where
825833
},
826834
peer_manager,
827835
gossip_sync,
836+
sweeper.regenerate_and_broadcast_spend_if_necessary_async().await,
828837
logger,
829838
scorer,
830839
should_break,
@@ -937,16 +946,17 @@ impl BackgroundProcessor {
937946
PM: 'static + Deref + Send + Sync,
938947
S: 'static + Deref<Target = SC> + Send + Sync,
939948
SC: for<'b> WriteableScore<'b>,
940-
D: 'static + Deref + Send + Sync,
949+
D: Deref,
941950
O: 'static + Deref + Send + Sync,
942951
K: 'static + Deref + Send + Sync,
943-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
952+
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send + Sync,
944953
>(
945954
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
946955
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
947956
sweeper: OS, logger: L, scorer: Option<S>,
948957
) -> Self
949958
where
959+
D::Target: ChangeDestinationSourceSync,
950960
UL::Target: 'static + UtxoLookup,
951961
CF::Target: 'static + chain::Filter + Sync + Send,
952962
T::Target: 'static + BroadcasterInterface,
@@ -958,7 +968,6 @@ impl BackgroundProcessor {
958968
OM::Target: AOnionMessenger + Send + Sync,
959969
PM::Target: APeerManager + Send + Sync,
960970
O::Target: 'static + OutputSpender + Send + Sync,
961-
D::Target: 'static + ChangeDestinationSource + Send + Sync,
962971
K::Target: 'static + KVStore + Send + Sync,
963972
{
964973
let stop_thread = Arc::new(AtomicBool::new(false));
@@ -995,7 +1004,7 @@ impl BackgroundProcessor {
9951004
},
9961005
peer_manager,
9971006
gossip_sync,
998-
sweeper,
1007+
sweeper.regenerate_and_broadcast_spend_if_necessary(),
9991008
logger,
10001009
scorer,
10011010
stop_thread.load(Ordering::Acquire),
@@ -1108,8 +1117,7 @@ mod tests {
11081117
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
11091118
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
11101119
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1111-
use lightning::routing::utxo::UtxoLookup;
1112-
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
1120+
use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager};
11131121
use lightning::types::features::{ChannelFeatures, NodeFeatures};
11141122
use lightning::types::payment::PaymentHash;
11151123
use lightning::util::config::UserConfig;
@@ -1121,7 +1129,7 @@ mod tests {
11211129
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
11221130
};
11231131
use lightning::util::ser::Writeable;
1124-
use lightning::util::sweep::{OutputSpendStatus, OutputSweeper, PRUNE_DELAY_BLOCKS};
1132+
use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS};
11251133
use lightning::util::test_utils;
11261134
use lightning::{get_event, get_event_msg};
11271135
use lightning_persister::fs_store::FilesystemStore;
@@ -1242,7 +1250,7 @@ mod tests {
12421250
best_block: BestBlock,
12431251
scorer: Arc<LockingWrapper<TestScorer>>,
12441252
sweeper: Arc<
1245-
OutputSweeper<
1253+
OutputSweeperSync<
12461254
Arc<test_utils::TestBroadcaster>,
12471255
Arc<TestWallet>,
12481256
Arc<test_utils::TestFeeEstimator>,
@@ -1543,7 +1551,7 @@ mod tests {
15431551

15441552
struct TestWallet {}
15451553

1546-
impl ChangeDestinationSource for TestWallet {
1554+
impl ChangeDestinationSourceSync for TestWallet {
15471555
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
15481556
Ok(ScriptBuf::new())
15491557
}
@@ -1622,7 +1630,7 @@ mod tests {
16221630
IgnoringMessageHandler {},
16231631
));
16241632
let wallet = Arc::new(TestWallet {});
1625-
let sweeper = Arc::new(OutputSweeper::new(
1633+
let sweeper = Arc::new(OutputSweeperSync::new(
16261634
best_block,
16271635
Arc::clone(&tx_broadcaster),
16281636
Arc::clone(&fee_estimator),
@@ -2027,6 +2035,7 @@ mod tests {
20272035
Some(nodes[0].messenger.clone()),
20282036
nodes[0].rapid_gossip_sync(),
20292037
nodes[0].peer_manager.clone(),
2038+
nodes[0].sweeper.sweeper_async(),
20302039
nodes[0].logger.clone(),
20312040
Some(nodes[0].scorer.clone()),
20322041
move |dur: Duration| {
@@ -2514,6 +2523,7 @@ mod tests {
25142523
Some(nodes[0].messenger.clone()),
25152524
nodes[0].rapid_gossip_sync(),
25162525
nodes[0].peer_manager.clone(),
2526+
nodes[0].sweeper.sweeper_async(),
25172527
nodes[0].logger.clone(),
25182528
Some(nodes[0].scorer.clone()),
25192529
move |dur: Duration| {
@@ -2727,6 +2737,7 @@ mod tests {
27272737
Some(nodes[0].messenger.clone()),
27282738
nodes[0].no_gossip_sync(),
27292739
nodes[0].peer_manager.clone(),
2740+
nodes[0].sweeper.sweeper_async(),
27302741
nodes[0].logger.clone(),
27312742
Some(nodes[0].scorer.clone()),
27322743
move |dur: Duration| {

lightning/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//! * `grind_signatures`
3131
3232
#![cfg_attr(not(any(test, fuzzing, feature = "_test_utils")), deny(missing_docs))]
33-
#![cfg_attr(not(any(test, feature = "_test_utils")), forbid(unsafe_code))]
3433

3534
#![deny(rustdoc::broken_intra_doc_links)]
3635
#![deny(rustdoc::private_intra_doc_links)]

lightning/src/sign/mod.rs

+35
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner;
6767
use crate::sign::taproot::TaprootChannelSigner;
6868
use crate::util::atomic_counter::AtomicCounter;
6969
use core::convert::TryInto;
70+
use core::future::Future;
71+
use core::ops::Deref;
72+
use core::pin::Pin;
7073
use core::sync::atomic::{AtomicUsize, Ordering};
7174
#[cfg(taproot)]
7275
use musig2::types::{PartialSignature, PublicNonce};
@@ -975,17 +978,49 @@ pub trait SignerProvider {
975978
fn get_shutdown_scriptpubkey(&self) -> Result<ShutdownScript, ()>;
976979
}
977980

981+
982+
/// Result type for `BlockSource` requests.
983+
pub type GetChangeDestinationScriptResult<T> = Result<T, ()>;
984+
985+
/// A type alias for a future that returns a [`GetChangeDestinationScriptResult`].
986+
pub type AsyncGetChangeDestinationScriptResult<'a, T> =
987+
Pin<Box<dyn Future<Output = GetChangeDestinationScriptResult<T>> + 'a + Send>>;
988+
978989
/// A helper trait that describes an on-chain wallet capable of returning a (change) destination
979990
/// script.
980991
pub trait ChangeDestinationSource {
981992
/// Returns a script pubkey which can be used as a change destination for
982993
/// [`OutputSpender::spend_spendable_outputs`].
983994
///
995+
/// This method should return a different value each time it is called, to avoid linking
996+
/// on-chain funds controlled to the same user.
997+
fn get_change_destination_script<'a>(&self) -> AsyncGetChangeDestinationScriptResult<'a, ScriptBuf>;
998+
}
999+
1000+
1001+
/// A synchronous helper trait that describes an on-chain wallet capable of returning a (change) destination script.
1002+
pub trait ChangeDestinationSourceSync {
9841003
/// This method should return a different value each time it is called, to avoid linking
9851004
/// on-chain funds controlled to the same user.
9861005
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()>;
9871006
}
9881007

1008+
/// A wrapper around [`ChangeDestinationSource`] to allow for async calls.
1009+
pub struct ChangeDestinationSourceSyncWrapper<T: Deref>(T) where T::Target:ChangeDestinationSourceSync;
1010+
1011+
impl<T: Deref> ChangeDestinationSourceSyncWrapper<T> where T::Target:ChangeDestinationSourceSync {
1012+
/// Creates a new [`ChangeDestinationSourceSyncWrapper`].
1013+
pub fn new(source: T) -> Self {
1014+
Self(source)
1015+
}
1016+
}
1017+
impl<T: Deref> ChangeDestinationSource for ChangeDestinationSourceSyncWrapper<T> where T::Target:ChangeDestinationSourceSync{
1018+
fn get_change_destination_script<'a>(&self) -> AsyncGetChangeDestinationScriptResult<'a, ScriptBuf> {
1019+
let script = self.0.get_change_destination_script();
1020+
Box::pin(async move { script })
1021+
}
1022+
}
1023+
9891024
mod sealed {
9901025
use bitcoin::secp256k1::{Scalar, SecretKey};
9911026

lightning/src/util/async_poll.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::prelude::*;
1313
use core::future::Future;
1414
use core::marker::Unpin;
1515
use core::pin::Pin;
16-
use core::task::{Context, Poll};
16+
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

1818
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
1919
Pending(F),
@@ -74,3 +74,22 @@ impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
7474
}
7575
}
7676
}
77+
78+
// If we want to poll a future without an async context to figure out if it has completed or
79+
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
80+
// but sadly there's a good bit of boilerplate here.
81+
fn dummy_waker_clone(_: *const ()) -> RawWaker {
82+
RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
83+
}
84+
fn dummy_waker_action(_: *const ()) {}
85+
86+
const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
87+
dummy_waker_clone,
88+
dummy_waker_action,
89+
dummy_waker_action,
90+
dummy_waker_action,
91+
);
92+
93+
pub(crate) fn dummy_waker() -> Waker {
94+
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
95+
}

0 commit comments

Comments
 (0)