Skip to content

Commit 8185695

Browse files
committed
Decouple spending from chain notifications
1 parent 46cb5ff commit 8185695

File tree

2 files changed

+139
-87
lines changed

2 files changed

+139
-87
lines changed

lightning-background-processor/src/lib.rs

+93-14
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -130,6 +132,11 @@ const REBROADCAST_TIMER: u64 = 30;
130132
#[cfg(test)]
131133
const REBROADCAST_TIMER: u64 = 1;
132134

135+
#[cfg(not(test))]
136+
const SWEEPER_TIMER: u64 = 30;
137+
#[cfg(test)]
138+
const SWEEPER_TIMER: u64 = 1;
139+
133140
#[cfg(feature = "futures")]
134141
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
135142
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -306,6 +313,7 @@ macro_rules! define_run_body {
306313
$channel_manager: ident, $process_channel_manager_events: expr,
307314
$onion_messenger: ident, $process_onion_message_handler_events: expr,
308315
$peer_manager: ident, $gossip_sync: ident,
316+
$process_sweeper: expr,
309317
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
310318
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
311319
) => { {
@@ -320,6 +328,7 @@ macro_rules! define_run_body {
320328
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
321329
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
322330
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
331+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
323332
let mut have_pruned = false;
324333
let mut have_decayed_scorer = false;
325334

@@ -461,6 +470,12 @@ macro_rules! define_run_body {
461470
$chain_monitor.rebroadcast_pending_claims();
462471
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
463472
}
473+
474+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
475+
log_trace!($logger, "Regenerating sweeper spends if necessary");
476+
let _ = $process_sweeper;
477+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
478+
}
464479
}
465480

466481
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -618,6 +633,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
618633
/// ```
619634
/// # use lightning::io;
620635
/// # use lightning::events::ReplayEvent;
636+
/// # use lightning::util::sweep::OutputSweeper;
621637
/// # use std::sync::{Arc, RwLock};
622638
/// # use std::sync::atomic::{AtomicBool, Ordering};
623639
/// # use std::time::SystemTime;
@@ -656,6 +672,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
656672
/// # F: lightning::chain::Filter + Send + Sync + 'static,
657673
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
658674
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
675+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
676+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
677+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
659678
/// # > {
660679
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
661680
/// # event_handler: Arc<EventHandler>,
@@ -666,14 +685,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666685
/// # persister: Arc<Store>,
667686
/// # logger: Arc<Logger>,
668687
/// # scorer: Arc<Scorer>,
688+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
669689
/// # }
670690
/// #
671691
/// # async fn setup_background_processing<
672692
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
673693
/// # F: lightning::chain::Filter + Send + Sync + 'static,
674694
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
675695
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
676-
/// # >(node: Node<B, F, FE, UL>) {
696+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
697+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
698+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
699+
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
677700
/// let background_persister = Arc::clone(&node.persister);
678701
/// let background_event_handler = Arc::clone(&node.event_handler);
679702
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -683,7 +706,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
683706
/// let background_onion_messenger = Arc::clone(&node.onion_messenger);
684707
/// let background_logger = Arc::clone(&node.logger);
685708
/// let background_scorer = Arc::clone(&node.scorer);
686-
///
709+
/// let background_sweeper = Arc::clone(&node.sweeper);
710+
687711
/// // Setup the sleeper.
688712
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
689713
///
@@ -708,6 +732,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
708732
/// Some(background_onion_messenger),
709733
/// background_gossip_sync,
710734
/// background_peer_man,
735+
/// Some(background_sweeper),
711736
/// background_logger,
712737
/// Some(background_scorer),
713738
/// sleeper,
@@ -742,6 +767,10 @@ pub async fn process_events_async<
742767
+ Sync,
743768
CM: 'static + Deref + Send + Sync,
744769
OM: 'static + Deref + Send + Sync,
770+
D: 'static + Deref,
771+
O: 'static + Deref,
772+
K: 'static + Deref,
773+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
745774
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
746775
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
747776
PM: 'static + Deref + Send + Sync,
@@ -753,12 +782,12 @@ pub async fn process_events_async<
753782
>(
754783
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
755784
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
756-
logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
757-
fetch_time: FetchTime,
785+
sweeper: Option<OS>, logger: L, scorer: Option<S>, sleeper: Sleeper,
786+
mobile_interruptable_platform: bool, fetch_time: FetchTime,
758787
) -> Result<(), lightning::io::Error>
759788
where
760789
UL::Target: 'static + UtxoLookup,
761-
CF::Target: 'static + chain::Filter,
790+
CF::Target: 'static + chain::Filter + Sync + Send,
762791
T::Target: 'static + BroadcasterInterface,
763792
F::Target: 'static + FeeEstimator,
764793
L::Target: 'static + Logger,
@@ -767,6 +796,9 @@ where
767796
CM::Target: AChannelManager + Send + Sync,
768797
OM::Target: AOnionMessenger + Send + Sync,
769798
PM::Target: APeerManager + Send + Sync,
799+
O::Target: 'static + OutputSpender,
800+
D::Target: 'static + ChangeDestinationSource,
801+
K::Target: 'static + KVStore,
770802
{
771803
let mut should_break = false;
772804
let async_event_handler = |event| {
@@ -810,6 +842,13 @@ where
810842
},
811843
peer_manager,
812844
gossip_sync,
845+
{
846+
if let Some(ref sweeper) = sweeper {
847+
sweeper.regenerate_and_broadcast_spend_if_necessary()
848+
} else {
849+
Ok(())
850+
}
851+
},
813852
logger,
814853
scorer,
815854
should_break,
@@ -922,14 +961,18 @@ impl BackgroundProcessor {
922961
PM: 'static + Deref + Send + Sync,
923962
S: 'static + Deref<Target = SC> + Send + Sync,
924963
SC: for<'b> WriteableScore<'b>,
964+
D: 'static + Deref,
965+
O: 'static + Deref,
966+
K: 'static + Deref,
967+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
925968
>(
926969
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927970
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928-
logger: L, scorer: Option<S>,
971+
sweeper: Option<OS>, logger: L, scorer: Option<S>,
929972
) -> Self
930973
where
931974
UL::Target: 'static + UtxoLookup,
932-
CF::Target: 'static + chain::Filter,
975+
CF::Target: 'static + chain::Filter + Sync + Send,
933976
T::Target: 'static + BroadcasterInterface,
934977
F::Target: 'static + FeeEstimator,
935978
L::Target: 'static + Logger,
@@ -938,6 +981,9 @@ impl BackgroundProcessor {
938981
CM::Target: AChannelManager + Send + Sync,
939982
OM::Target: AOnionMessenger + Send + Sync,
940983
PM::Target: APeerManager + Send + Sync,
984+
O::Target: 'static + OutputSpender,
985+
D::Target: 'static + ChangeDestinationSource,
986+
K::Target: 'static + KVStore,
941987
{
942988
let stop_thread = Arc::new(AtomicBool::new(false));
943989
let stop_thread_clone = stop_thread.clone();
@@ -973,6 +1019,13 @@ impl BackgroundProcessor {
9731019
},
9741020
peer_manager,
9751021
gossip_sync,
1022+
{
1023+
if let Some(ref sweeper) = sweeper {
1024+
sweeper.regenerate_and_broadcast_spend_if_necessary()
1025+
} else {
1026+
Ok(())
1027+
}
1028+
},
9761029
logger,
9771030
scorer,
9781031
stop_thread.load(Ordering::Acquire),
@@ -1069,7 +1122,7 @@ mod tests {
10691122
use core::sync::atomic::{AtomicBool, Ordering};
10701123
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
10711124
use lightning::chain::transaction::OutPoint;
1072-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1125+
use lightning::chain::{chainmonitor, BestBlock, Confirm};
10731126
use lightning::events::{Event, PathFailure, ReplayEvent};
10741127
use lightning::ln::channelmanager;
10751128
use lightning::ln::channelmanager::{
@@ -1222,7 +1275,7 @@ mod tests {
12221275
Arc<test_utils::TestBroadcaster>,
12231276
Arc<TestWallet>,
12241277
Arc<test_utils::TestFeeEstimator>,
1225-
Arc<dyn Filter + Sync + Send>,
1278+
Arc<test_utils::TestChainSource>,
12261279
Arc<FilesystemStore>,
12271280
Arc<test_utils::TestLogger>,
12281281
Arc<KeysManager>,
@@ -1601,7 +1654,7 @@ mod tests {
16011654
best_block,
16021655
Arc::clone(&tx_broadcaster),
16031656
Arc::clone(&fee_estimator),
1604-
None::<Arc<dyn Filter + Sync + Send>>,
1657+
None::<Arc<test_utils::TestChainSource>>,
16051658
Arc::clone(&keys_manager),
16061659
wallet,
16071660
Arc::clone(&kv_store),
@@ -1831,6 +1884,7 @@ mod tests {
18311884
Some(nodes[0].messenger.clone()),
18321885
nodes[0].p2p_gossip_sync(),
18331886
nodes[0].peer_manager.clone(),
1887+
Some(nodes[0].sweeper.clone()),
18341888
nodes[0].logger.clone(),
18351889
Some(nodes[0].scorer.clone()),
18361890
);
@@ -1924,6 +1978,7 @@ mod tests {
19241978
Some(nodes[0].messenger.clone()),
19251979
nodes[0].no_gossip_sync(),
19261980
nodes[0].peer_manager.clone(),
1981+
Some(nodes[0].sweeper.clone()),
19271982
nodes[0].logger.clone(),
19281983
Some(nodes[0].scorer.clone()),
19291984
);
@@ -1966,6 +2021,7 @@ mod tests {
19662021
Some(nodes[0].messenger.clone()),
19672022
nodes[0].no_gossip_sync(),
19682023
nodes[0].peer_manager.clone(),
2024+
Some(nodes[0].sweeper.clone()),
19692025
nodes[0].logger.clone(),
19702026
Some(nodes[0].scorer.clone()),
19712027
);
@@ -1998,6 +2054,7 @@ mod tests {
19982054
Some(nodes[0].messenger.clone()),
19992055
nodes[0].rapid_gossip_sync(),
20002056
nodes[0].peer_manager.clone(),
2057+
Some(nodes[0].sweeper.clone()),
20012058
nodes[0].logger.clone(),
20022059
Some(nodes[0].scorer.clone()),
20032060
move |dur: Duration| {
@@ -2034,6 +2091,7 @@ mod tests {
20342091
Some(nodes[0].messenger.clone()),
20352092
nodes[0].p2p_gossip_sync(),
20362093
nodes[0].peer_manager.clone(),
2094+
Some(nodes[0].sweeper.clone()),
20372095
nodes[0].logger.clone(),
20382096
Some(nodes[0].scorer.clone()),
20392097
);
@@ -2063,6 +2121,7 @@ mod tests {
20632121
Some(nodes[0].messenger.clone()),
20642122
nodes[0].no_gossip_sync(),
20652123
nodes[0].peer_manager.clone(),
2124+
Some(nodes[0].sweeper.clone()),
20662125
nodes[0].logger.clone(),
20672126
Some(nodes[0].scorer.clone()),
20682127
);
@@ -2109,6 +2168,7 @@ mod tests {
21092168
Some(nodes[0].messenger.clone()),
21102169
nodes[0].no_gossip_sync(),
21112170
nodes[0].peer_manager.clone(),
2171+
Some(nodes[0].sweeper.clone()),
21122172
nodes[0].logger.clone(),
21132173
Some(nodes[0].scorer.clone()),
21142174
);
@@ -2171,6 +2231,7 @@ mod tests {
21712231
Some(nodes[0].messenger.clone()),
21722232
nodes[0].no_gossip_sync(),
21732233
nodes[0].peer_manager.clone(),
2234+
Some(nodes[0].sweeper.clone()),
21742235
nodes[0].logger.clone(),
21752236
Some(nodes[0].scorer.clone()),
21762237
);
@@ -2216,10 +2277,22 @@ mod tests {
22162277

22172278
advance_chain(&mut nodes[0], 3);
22182279

2280+
let tx_broadcaster = nodes[0].tx_broadcaster.clone();
2281+
let wait_for_sweep_tx = || -> Transaction {
2282+
loop {
2283+
let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
2284+
if let Some(sweep_tx) = sweep_tx {
2285+
return sweep_tx;
2286+
}
2287+
2288+
std::thread::sleep(Duration::from_millis(100));
2289+
}
2290+
};
2291+
22192292
// Check we generate an initial sweeping tx.
22202293
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2294+
let sweep_tx_0 = wait_for_sweep_tx();
22212295
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2222-
let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22232296
match tracked_output.status {
22242297
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22252298
assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
@@ -2230,8 +2303,8 @@ mod tests {
22302303
// Check we regenerate and rebroadcast the sweeping tx each block.
22312304
advance_chain(&mut nodes[0], 1);
22322305
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2306+
let sweep_tx_1 = wait_for_sweep_tx();
22332307
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2234-
let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22352308
match tracked_output.status {
22362309
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22372310
assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
@@ -2242,8 +2315,8 @@ mod tests {
22422315

22432316
advance_chain(&mut nodes[0], 1);
22442317
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2318+
let sweep_tx_2 = wait_for_sweep_tx();
22452319
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2246-
let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22472320
match tracked_output.status {
22482321
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22492322
assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
@@ -2322,6 +2395,7 @@ mod tests {
23222395
Some(nodes[0].messenger.clone()),
23232396
nodes[0].no_gossip_sync(),
23242397
nodes[0].peer_manager.clone(),
2398+
Some(nodes[0].sweeper.clone()),
23252399
nodes[0].logger.clone(),
23262400
Some(nodes[0].scorer.clone()),
23272401
);
@@ -2351,6 +2425,7 @@ mod tests {
23512425
Some(nodes[0].messenger.clone()),
23522426
nodes[0].no_gossip_sync(),
23532427
nodes[0].peer_manager.clone(),
2428+
Some(nodes[0].sweeper.clone()),
23542429
nodes[0].logger.clone(),
23552430
Some(nodes[0].scorer.clone()),
23562431
);
@@ -2446,6 +2521,7 @@ mod tests {
24462521
Some(nodes[0].messenger.clone()),
24472522
nodes[0].rapid_gossip_sync(),
24482523
nodes[0].peer_manager.clone(),
2524+
Some(nodes[0].sweeper.clone()),
24492525
nodes[0].logger.clone(),
24502526
Some(nodes[0].scorer.clone()),
24512527
);
@@ -2478,6 +2554,7 @@ mod tests {
24782554
Some(nodes[0].messenger.clone()),
24792555
nodes[0].rapid_gossip_sync(),
24802556
nodes[0].peer_manager.clone(),
2557+
Some(nodes[0].sweeper.clone()),
24812558
nodes[0].logger.clone(),
24822559
Some(nodes[0].scorer.clone()),
24832560
move |dur: Duration| {
@@ -2640,6 +2717,7 @@ mod tests {
26402717
Some(nodes[0].messenger.clone()),
26412718
nodes[0].no_gossip_sync(),
26422719
nodes[0].peer_manager.clone(),
2720+
Some(nodes[0].sweeper.clone()),
26432721
nodes[0].logger.clone(),
26442722
Some(nodes[0].scorer.clone()),
26452723
);
@@ -2690,6 +2768,7 @@ mod tests {
26902768
Some(nodes[0].messenger.clone()),
26912769
nodes[0].no_gossip_sync(),
26922770
nodes[0].peer_manager.clone(),
2771+
Some(nodes[0].sweeper.clone()),
26932772
nodes[0].logger.clone(),
26942773
Some(nodes[0].scorer.clone()),
26952774
move |dur: Duration| {

0 commit comments

Comments
 (0)