Skip to content

Commit ac3d697

Browse files
committed
Decouple spending from chain notifications
1 parent 46cb5ff commit ac3d697

File tree

2 files changed

+97
-73
lines changed

2 files changed

+97
-73
lines changed

lightning-background-processor/src/lib.rs

+43-6
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -130,6 +132,11 @@ const REBROADCAST_TIMER: u64 = 30;
130132
#[cfg(test)]
131133
const REBROADCAST_TIMER: u64 = 1;
132134

135+
#[cfg(not(test))]
136+
const SWEEPER_TIMER: u64 = 30;
137+
#[cfg(test)]
138+
const SWEEPER_TIMER: u64 = 1;
139+
133140
#[cfg(feature = "futures")]
134141
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
135142
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -306,6 +313,7 @@ macro_rules! define_run_body {
306313
$channel_manager: ident, $process_channel_manager_events: expr,
307314
$onion_messenger: ident, $process_onion_message_handler_events: expr,
308315
$peer_manager: ident, $gossip_sync: ident,
316+
$sweeper: ident,
309317
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
310318
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
311319
) => { {
@@ -320,6 +328,7 @@ macro_rules! define_run_body {
320328
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
321329
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
322330
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
331+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
323332
let mut have_pruned = false;
324333
let mut have_decayed_scorer = false;
325334

@@ -461,6 +470,12 @@ macro_rules! define_run_body {
461470
$chain_monitor.rebroadcast_pending_claims();
462471
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
463472
}
473+
474+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
475+
log_trace!($logger, "Regenerate sweeper spends if necessary");
476+
let _ = $sweeper.regenerate_and_broadcast_spend_if_necessary_locked();
477+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
478+
}
464479
}
465480

466481
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -922,14 +937,18 @@ impl BackgroundProcessor {
922937
PM: 'static + Deref + Send + Sync,
923938
S: 'static + Deref<Target = SC> + Send + Sync,
924939
SC: for<'b> WriteableScore<'b>,
940+
D: 'static + Deref + Send + Sync,
941+
O: 'static + Deref + Send + Sync,
942+
K: 'static + Deref + Send + Sync,
943+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
925944
>(
926945
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927946
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928-
logger: L, scorer: Option<S>,
947+
sweeper: OS, logger: L, scorer: Option<S>,
929948
) -> Self
930949
where
931950
UL::Target: 'static + UtxoLookup,
932-
CF::Target: 'static + chain::Filter,
951+
CF::Target: 'static + chain::Filter + Sync + Send,
933952
T::Target: 'static + BroadcasterInterface,
934953
F::Target: 'static + FeeEstimator,
935954
L::Target: 'static + Logger,
@@ -938,6 +957,9 @@ impl BackgroundProcessor {
938957
CM::Target: AChannelManager + Send + Sync,
939958
OM::Target: AOnionMessenger + Send + Sync,
940959
PM::Target: APeerManager + Send + Sync,
960+
O::Target: 'static + OutputSpender + Send + Sync,
961+
D::Target: 'static + ChangeDestinationSource + Send + Sync,
962+
K::Target: 'static + KVStore + Send + Sync,
941963
{
942964
let stop_thread = Arc::new(AtomicBool::new(false));
943965
let stop_thread_clone = stop_thread.clone();
@@ -973,6 +995,7 @@ impl BackgroundProcessor {
973995
},
974996
peer_manager,
975997
gossip_sync,
998+
sweeper,
976999
logger,
9771000
scorer,
9781001
stop_thread.load(Ordering::Acquire),
@@ -1069,7 +1092,7 @@ mod tests {
10691092
use core::sync::atomic::{AtomicBool, Ordering};
10701093
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
10711094
use lightning::chain::transaction::OutPoint;
1072-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1095+
use lightning::chain::{self, chainmonitor, BestBlock, Confirm, Filter};
10731096
use lightning::events::{Event, PathFailure, ReplayEvent};
10741097
use lightning::ln::channelmanager;
10751098
use lightning::ln::channelmanager::{
@@ -1085,6 +1108,7 @@ mod tests {
10851108
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
10861109
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
10871110
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1111+
use lightning::routing::utxo::UtxoLookup;
10881112
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
10891113
use lightning::types::features::{ChannelFeatures, NodeFeatures};
10901114
use lightning::types::payment::PaymentHash;
@@ -1155,7 +1179,7 @@ mod tests {
11551179

11561180
type ChainMonitor = chainmonitor::ChainMonitor<
11571181
InMemorySigner,
1158-
Arc<test_utils::TestChainSource>,
1182+
Arc<dyn chain::Filter + Sync + Send>,
11591183
Arc<test_utils::TestBroadcaster>,
11601184
Arc<test_utils::TestFeeEstimator>,
11611185
Arc<test_utils::TestLogger>,
@@ -1558,12 +1582,13 @@ mod tests {
15581582
Arc::clone(&keys_manager),
15591583
));
15601584
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1585+
let x: Arc<dyn Filter + Send + Sync> = chain_source.clone();
15611586
let kv_store =
15621587
Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
15631588
let now = Duration::from_secs(genesis_block.header.time as u64);
15641589
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
15651590
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
1566-
Some(chain_source.clone()),
1591+
Some(x.clone()),
15671592
tx_broadcaster.clone(),
15681593
logger.clone(),
15691594
fee_estimator.clone(),
@@ -1823,6 +1848,7 @@ mod tests {
18231848
let data_dir = nodes[0].kv_store.get_data_dir();
18241849
let persister = Arc::new(Persister::new(data_dir));
18251850
let event_handler = |_: _| Ok(());
1851+
18261852
let bg_processor = BackgroundProcessor::start(
18271853
persister,
18281854
event_handler,
@@ -1831,6 +1857,7 @@ mod tests {
18311857
Some(nodes[0].messenger.clone()),
18321858
nodes[0].p2p_gossip_sync(),
18331859
nodes[0].peer_manager.clone(),
1860+
nodes[0].sweeper.clone(),
18341861
nodes[0].logger.clone(),
18351862
Some(nodes[0].scorer.clone()),
18361863
);
@@ -1924,6 +1951,7 @@ mod tests {
19241951
Some(nodes[0].messenger.clone()),
19251952
nodes[0].no_gossip_sync(),
19261953
nodes[0].peer_manager.clone(),
1954+
nodes[0].sweeper.clone(),
19271955
nodes[0].logger.clone(),
19281956
Some(nodes[0].scorer.clone()),
19291957
);
@@ -1966,6 +1994,7 @@ mod tests {
19661994
Some(nodes[0].messenger.clone()),
19671995
nodes[0].no_gossip_sync(),
19681996
nodes[0].peer_manager.clone(),
1997+
nodes[0].sweeper.clone(),
19691998
nodes[0].logger.clone(),
19701999
Some(nodes[0].scorer.clone()),
19712000
);
@@ -2034,6 +2063,7 @@ mod tests {
20342063
Some(nodes[0].messenger.clone()),
20352064
nodes[0].p2p_gossip_sync(),
20362065
nodes[0].peer_manager.clone(),
2066+
nodes[0].sweeper.clone(),
20372067
nodes[0].logger.clone(),
20382068
Some(nodes[0].scorer.clone()),
20392069
);
@@ -2063,6 +2093,7 @@ mod tests {
20632093
Some(nodes[0].messenger.clone()),
20642094
nodes[0].no_gossip_sync(),
20652095
nodes[0].peer_manager.clone(),
2096+
nodes[0].sweeper.clone(),
20662097
nodes[0].logger.clone(),
20672098
Some(nodes[0].scorer.clone()),
20682099
);
@@ -2109,6 +2140,7 @@ mod tests {
21092140
Some(nodes[0].messenger.clone()),
21102141
nodes[0].no_gossip_sync(),
21112142
nodes[0].peer_manager.clone(),
2143+
nodes[0].sweeper.clone(),
21122144
nodes[0].logger.clone(),
21132145
Some(nodes[0].scorer.clone()),
21142146
);
@@ -2171,6 +2203,7 @@ mod tests {
21712203
Some(nodes[0].messenger.clone()),
21722204
nodes[0].no_gossip_sync(),
21732205
nodes[0].peer_manager.clone(),
2206+
nodes[0].sweeper.clone(),
21742207
nodes[0].logger.clone(),
21752208
Some(nodes[0].scorer.clone()),
21762209
);
@@ -2322,6 +2355,7 @@ mod tests {
23222355
Some(nodes[0].messenger.clone()),
23232356
nodes[0].no_gossip_sync(),
23242357
nodes[0].peer_manager.clone(),
2358+
nodes[0].sweeper.clone(),
23252359
nodes[0].logger.clone(),
23262360
Some(nodes[0].scorer.clone()),
23272361
);
@@ -2351,6 +2385,7 @@ mod tests {
23512385
Some(nodes[0].messenger.clone()),
23522386
nodes[0].no_gossip_sync(),
23532387
nodes[0].peer_manager.clone(),
2388+
nodes[0].sweeper.clone(),
23542389
nodes[0].logger.clone(),
23552390
Some(nodes[0].scorer.clone()),
23562391
);
@@ -2446,6 +2481,7 @@ mod tests {
24462481
Some(nodes[0].messenger.clone()),
24472482
nodes[0].rapid_gossip_sync(),
24482483
nodes[0].peer_manager.clone(),
2484+
nodes[0].sweeper.clone(),
24492485
nodes[0].logger.clone(),
24502486
Some(nodes[0].scorer.clone()),
24512487
);
@@ -2640,6 +2676,7 @@ mod tests {
26402676
Some(nodes[0].messenger.clone()),
26412677
nodes[0].no_gossip_sync(),
26422678
nodes[0].peer_manager.clone(),
2679+
nodes[0].sweeper.clone(),
26432680
nodes[0].logger.clone(),
26442681
Some(nodes[0].scorer.clone()),
26452682
);

lightning/src/util/sweep.rs

+54-67
Original file line numberDiff line numberDiff line change
@@ -416,40 +416,26 @@ where
416416
return Ok(());
417417
}
418418

419-
let spending_tx_opt;
420-
{
421-
let mut state_lock = self.sweeper_state.lock().unwrap();
422-
for descriptor in relevant_descriptors {
423-
let output_info = TrackedSpendableOutput {
424-
descriptor,
425-
channel_id,
426-
status: OutputSpendStatus::PendingInitialBroadcast {
427-
delayed_until_height: delay_until_height,
428-
},
429-
};
430-
431-
if state_lock
432-
.outputs
433-
.iter()
434-
.find(|o| o.descriptor == output_info.descriptor)
435-
.is_some()
436-
{
437-
continue;
438-
}
439-
440-
state_lock.outputs.push(output_info);
419+
let mut state_lock = self.sweeper_state.lock().unwrap();
420+
for descriptor in relevant_descriptors {
421+
let output_info = TrackedSpendableOutput {
422+
descriptor,
423+
channel_id,
424+
status: OutputSpendStatus::PendingInitialBroadcast {
425+
delayed_until_height: delay_until_height,
426+
},
427+
};
428+
429+
if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some()
430+
{
431+
continue;
441432
}
442-
spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
443-
self.persist_state(&*state_lock).map_err(|e| {
444-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
445-
})?;
446-
}
447433

448-
if let Some(spending_tx) = spending_tx_opt {
449-
self.broadcaster.broadcast_transactions(&[&spending_tx]);
434+
state_lock.outputs.push(output_info);
450435
}
451-
452-
Ok(())
436+
self.persist_state(&*state_lock).map_err(|e| {
437+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
438+
})
453439
}
454440

455441
/// Returns a list of the currently tracked spendable outputs.
@@ -463,6 +449,27 @@ where
463449
self.sweeper_state.lock().unwrap().best_block
464450
}
465451

452+
/// Regenerates and broadcasts the spending transaction for any outputs that are pending
453+
pub fn regenerate_and_broadcast_spend_if_necessary_locked(&self) -> Result<(), ()> {
454+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
455+
self.regenerate_and_broadcast_spend_if_necessary(&mut *sweeper_state)
456+
}
457+
458+
fn regenerate_and_broadcast_spend_if_necessary(
459+
&self, sweeper_state: &mut SweeperState,
460+
) -> Result<(), ()> {
461+
let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
462+
if let Some(spending_tx) = spending_tx_opt {
463+
self.persist_state(&*sweeper_state).map_err(|e| {
464+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
465+
})?;
466+
467+
self.broadcaster.broadcast_transactions(&[&spending_tx]);
468+
}
469+
470+
Ok(())
471+
}
472+
466473
fn regenerate_spend_if_necessary(
467474
&self, sweeper_state: &mut SweeperState,
468475
) -> Option<Transaction> {
@@ -601,11 +608,9 @@ where
601608

602609
fn best_block_updated_internal(
603610
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
604-
) -> Option<Transaction> {
611+
) {
605612
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
606613
self.prune_confirmed_outputs(sweeper_state);
607-
let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
608-
spending_tx_opt
609614
}
610615
}
611616

@@ -623,27 +628,18 @@ where
623628
fn filtered_block_connected(
624629
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
625630
) {
626-
let mut spending_tx_opt;
627-
{
628-
let mut state_lock = self.sweeper_state.lock().unwrap();
629-
assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
630-
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
631-
assert_eq!(state_lock.best_block.height, height - 1,
632-
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
633-
634-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
635-
spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
631+
let mut state_lock = self.sweeper_state.lock().unwrap();
632+
assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
633+
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
634+
assert_eq!(state_lock.best_block.height, height - 1,
635+
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
636636

637-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
638-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
639-
// Skip broadcasting if the persist failed.
640-
spending_tx_opt = None;
641-
});
642-
}
637+
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
638+
self.best_block_updated_internal(&mut *state_lock, header, height);
643639

644-
if let Some(spending_tx) = spending_tx_opt {
645-
self.broadcaster.broadcast_transactions(&[&spending_tx]);
646-
}
640+
let _ = self.persist_state(&*state_lock).map_err(|e| {
641+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
642+
});
647643
}
648644

649645
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -717,20 +713,11 @@ where
717713
}
718714

719715
fn best_block_updated(&self, header: &Header, height: u32) {
720-
let mut spending_tx_opt;
721-
{
722-
let mut state_lock = self.sweeper_state.lock().unwrap();
723-
spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
724-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
725-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
726-
// Skip broadcasting if the persist failed.
727-
spending_tx_opt = None;
728-
});
729-
}
730-
731-
if let Some(spending_tx) = spending_tx_opt {
732-
self.broadcaster.broadcast_transactions(&[&spending_tx]);
733-
}
716+
let mut state_lock = self.sweeper_state.lock().unwrap();
717+
self.best_block_updated_internal(&mut *state_lock, header, height);
718+
let _ = self.persist_state(&*state_lock).map_err(|e| {
719+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
720+
});
734721
}
735722

736723
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {

0 commit comments

Comments
 (0)