Skip to content

Commit 04f3894

Browse files
committed
Optimize monitor persistence in fuzz harness
Previously, TestChainMonitor::update_channel would deserialize the monitor from stored bytes, apply the update, and serialize it back. This duplicated the work already done by the inner ChainMonitor, which applies the update to its in-memory monitor and calls the persister. Instead, have TestPersister capture the monitor directly when the real ChainMonitor calls persist. Serialization is deferred until reload_node actually needs the bytes, which happens rarely (only on specific fuzz input bytes that trigger a node restart). This eliminates redundant deserialization and serialization on every monitor update, replacing the expensive serialize-on-every-persist with a cheaper clone. AI tools were used in preparing this commit.
1 parent abb98ae commit 04f3894

File tree

3 files changed

+52
-56
lines changed

3 files changed

+52
-56
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 29 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,14 @@ struct LatestMonitorState {
244244
/// which we haven't yet completed. We're allowed to reload with those as well, at least until
245245
/// they're completed.
246246
persisted_monitor_id: u64,
247-
/// The latest serialized `ChannelMonitor` that we told LDK we persisted.
248-
persisted_monitor: Vec<u8>,
249-
/// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
247+
/// The latest `ChannelMonitor` that we told LDK we persisted.
248+
persisted_monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
249+
/// A set of (monitor id, `ChannelMonitor`)s which we're currently "persisting",
250250
/// from LDK's perspective.
251-
pending_monitors: Vec<(u64, Vec<u8>)>,
251+
pending_monitors: Vec<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>,
252252
}
253253

254254
struct TestChainMonitor {
255-
pub logger: Arc<dyn Logger>,
256-
pub keys: Arc<KeyProvider>,
257255
pub persister: Arc<TestPersister>,
258256
pub chain_monitor: Arc<
259257
chainmonitor::ChainMonitor<
@@ -277,15 +275,13 @@ impl TestChainMonitor {
277275
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
278276
None,
279277
broadcaster,
280-
logger.clone(),
278+
logger,
281279
feeest,
282280
Arc::clone(&persister),
283281
Arc::clone(&keys),
284282
keys.get_peer_storage_key(),
285283
false,
286284
)),
287-
logger,
288-
keys,
289285
persister,
290286
latest_monitors: Mutex::new(new_hash_map()),
291287
}
@@ -295,20 +291,22 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
295291
fn watch_channel(
296292
&self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
297293
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
298-
let mut ser = VecWriter(Vec::new());
299-
monitor.write(&mut ser).unwrap();
300294
let monitor_id = monitor.get_latest_update_id();
301295
let res = self.chain_monitor.watch_channel(channel_id, monitor);
296+
let mon = self.persister.take_latest_monitor(&channel_id);
302297
let state = match res {
303298
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
304299
persisted_monitor_id: monitor_id,
305-
persisted_monitor: ser.0,
300+
persisted_monitor: mon,
306301
pending_monitors: Vec::new(),
307302
},
308-
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState {
309-
persisted_monitor_id: monitor_id,
310-
persisted_monitor: Vec::new(),
311-
pending_monitors: vec![(monitor_id, ser.0)],
303+
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => {
304+
let persisted_monitor = mon.clone();
305+
LatestMonitorState {
306+
persisted_monitor_id: monitor_id,
307+
persisted_monitor,
308+
pending_monitors: vec![(monitor_id, mon)],
309+
}
312310
},
313311
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
314312
Err(()) => panic!(),
@@ -324,37 +322,15 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
324322
) -> chain::ChannelMonitorUpdateStatus {
325323
let mut map_lock = self.latest_monitors.lock().unwrap();
326324
let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call");
327-
let latest_monitor_data = map_entry
328-
.pending_monitors
329-
.last()
330-
.as_ref()
331-
.map(|(_, data)| data)
332-
.unwrap_or(&map_entry.persisted_monitor);
333-
let deserialized_monitor =
334-
<(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
335-
&mut &latest_monitor_data[..],
336-
(&*self.keys, &*self.keys),
337-
)
338-
.unwrap()
339-
.1;
340-
deserialized_monitor
341-
.update_monitor(
342-
update,
343-
&&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) },
344-
&&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) },
345-
&self.logger,
346-
)
347-
.unwrap();
348-
let mut ser = VecWriter(Vec::new());
349-
deserialized_monitor.write(&mut ser).unwrap();
350325
let res = self.chain_monitor.update_channel(channel_id, update);
326+
let mon = self.persister.take_latest_monitor(&channel_id);
351327
match res {
352328
chain::ChannelMonitorUpdateStatus::Completed => {
353329
map_entry.persisted_monitor_id = update.update_id;
354-
map_entry.persisted_monitor = ser.0;
330+
map_entry.persisted_monitor = mon;
355331
},
356332
chain::ChannelMonitorUpdateStatus::InProgress => {
357-
map_entry.pending_monitors.push((update.update_id, ser.0));
333+
map_entry.pending_monitors.push((update.update_id, mon));
358334
},
359335
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
360336
}
@@ -914,9 +890,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
914890
$broadcaster.clone(),
915891
logger.clone(),
916892
$fee_estimator.clone(),
917-
Arc::new(TestPersister {
918-
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
919-
}),
893+
Arc::new(TestPersister::new(mon_style[$node_id as usize].borrow().clone())),
920894
Arc::clone(&keys_manager),
921895
));
922896

@@ -966,9 +940,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
966940
broadcaster.clone(),
967941
logger.clone(),
968942
Arc::clone(fee_estimator),
969-
Arc::new(TestPersister {
970-
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
971-
}),
943+
Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)),
972944
Arc::clone(keys),
973945
));
974946

@@ -983,7 +955,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
983955
let mut monitors = new_hash_map();
984956
let mut old_monitors = old_monitors.latest_monitors.lock().unwrap();
985957
for (channel_id, mut prev_state) in old_monitors.drain() {
986-
let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 {
958+
let (mon_id, mon) = if use_old_mons % 3 == 0 {
987959
// Reload with the oldest `ChannelMonitor` (the one that we already told
988960
// `ChannelManager` we finished persisting).
989961
(prev_state.persisted_monitor_id, prev_state.persisted_monitor)
@@ -999,14 +971,17 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
999971
// Use a different value of `use_old_mons` if we have another monitor (only for node B)
1000972
// by shifting `use_old_mons` one in base-3.
1001973
use_old_mons /= 3;
1002-
let mon = <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
1003-
&mut &serialized_mon[..],
974+
// Serialize and deserialize the monitor to verify round-trip correctness.
975+
let mut ser = VecWriter(Vec::new());
976+
mon.write(&mut ser).unwrap();
977+
let (_, deserialized_mon) = <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
978+
&mut &ser.0[..],
1004979
(&**keys, &**keys),
1005980
)
1006981
.expect("Failed to read monitor");
1007-
monitors.insert(channel_id, mon.1);
982+
monitors.insert(channel_id, deserialized_mon);
1008983
// Update the latest `ChannelMonitor` state to match what we just told LDK.
1009-
prev_state.persisted_monitor = serialized_mon;
984+
prev_state.persisted_monitor = mon;
1010985
prev_state.persisted_monitor_id = mon_id;
1011986
// Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
1012987
// considering them discarded. LDK should replay these for us as they're stored in
@@ -1981,10 +1956,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
19811956

19821957
let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
19831958
let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
1959+
type PendingMonitors = Vec<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>;
19841960
let complete_monitor_update =
19851961
|monitor: &Arc<TestChainMonitor>,
19861962
chan_funding,
1987-
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
1963+
compl_selector: &dyn Fn(&mut PendingMonitors) -> Option<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>| {
19881964
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
19891965
assert!(
19901966
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

fuzz/src/full_stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger + MaybeSend + MaybeSync>
598598
broadcast.clone(),
599599
Arc::clone(&logger),
600600
fee_est.clone(),
601-
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
601+
Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)),
602602
Arc::clone(&keys_manager),
603603
keys_manager.get_peer_storage_key(),
604604
false,

fuzz/src/utils/test_persister.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,45 @@
11
use lightning::chain;
22
use lightning::chain::{chainmonitor, channelmonitor};
3+
use lightning::ln::types::ChannelId;
34
use lightning::util::persist::MonitorName;
45
use lightning::util::test_channel_signer::TestChannelSigner;
56

7+
use std::collections::HashMap;
68
use std::sync::Mutex;
79

810
pub struct TestPersister {
911
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
12+
latest_monitors: Mutex<HashMap<ChannelId, channelmonitor::ChannelMonitor<TestChannelSigner>>>,
13+
}
14+
impl TestPersister {
15+
pub fn new(update_ret: chain::ChannelMonitorUpdateStatus) -> Self {
16+
Self {
17+
update_ret: Mutex::new(update_ret),
18+
latest_monitors: Mutex::new(HashMap::new()),
19+
}
20+
}
21+
22+
pub fn take_latest_monitor(
23+
&self, channel_id: &ChannelId,
24+
) -> channelmonitor::ChannelMonitor<TestChannelSigner> {
25+
self.latest_monitors.lock().unwrap().remove(channel_id)
26+
.expect("Persister should have monitor for channel")
27+
}
1028
}
1129
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1230
fn persist_new_channel(
1331
&self, _monitor_name: MonitorName,
14-
_data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
32+
data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
1533
) -> chain::ChannelMonitorUpdateStatus {
34+
self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone());
1635
self.update_ret.lock().unwrap().clone()
1736
}
1837

1938
fn update_persisted_channel(
2039
&self, _monitor_name: MonitorName, _update: Option<&channelmonitor::ChannelMonitorUpdate>,
21-
_data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
40+
data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
2241
) -> chain::ChannelMonitorUpdateStatus {
42+
self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone());
2343
self.update_ret.lock().unwrap().clone()
2444
}
2545

0 commit comments

Comments
 (0)