diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a7a0942f0c8..980325ac912 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1473,6 +1473,11 @@ enum BackgroundEvent { channel_id: ChannelId, highest_update_id_completed: u64, }, + /// A channel had blocked monitor updates waiting on startup. If the updates were blocked on + /// an MPP claim blocker not written to disk, we may be able to unblock them now. + /// + /// This event is never written to disk. + AttemptUnblockMonitorUpdates { counterparty_node_id: PublicKey, channel_id: ChannelId }, } /// A pointer to a channel that is unblocked when an event is surfaced @@ -8795,6 +8800,12 @@ impl< &counterparty_node_id, ); }, + BackgroundEvent::AttemptUnblockMonitorUpdates { + counterparty_node_id, + channel_id, + } => { + self.handle_monitor_update_release(counterparty_node_id, channel_id, None); + }, } } NotifyOption::DoPersist @@ -9751,6 +9762,7 @@ impl< BackgroundEvent::MonitorUpdatesComplete { channel_id, .. } => *channel_id == _prev_channel_id, + BackgroundEvent::AttemptUnblockMonitorUpdates { .. } => false, } }); assert!(channel_closed || matching_bg_event, "{:?}", *background_events); @@ -19456,6 +19468,14 @@ impl< log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); return Err(DecodeError::DangerousValue); } + if funded_chan.blocked_monitor_updates_pending() > 0 { + pending_background_events.push( + BackgroundEvent::AttemptUnblockMonitorUpdates { + counterparty_node_id: *counterparty_id, + channel_id: *chan_id, + }, + ); + } } else { // We shouldn't have persisted (or read) any unfunded channel types so none should have been // created in this `channel_by_id` map. diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 16ba896685e..9da90d95109 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -937,6 +937,358 @@ fn test_partial_claim_before_restart() { do_test_partial_claim_before_restart(true, true); } +#[test] +fn test_mpp_claim_htlc_fulfills_unblocked_on_reload() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // Open two independent channels between the same nodes. The payment below is large enough to + // force the router to split it across both channels, which is what makes the MPP claim depend + // on both ChannelMonitors durably learning the preimage. + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); + let chan_b = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); + let chan_id_a = chan_a.2; + let chan_id_b = chan_b.2; + let scid_a = chan_a.0.contents.short_channel_id; + let scid_b = chan_b.0.contents.short_channel_id; + + // Send an MPP payment to nodes[1]. `send_along_route_with_secret` leaves the payment + // claimable but unclaimed, so nodes[1] still has both inbound HTLCs live when we start + // manipulating monitor persistence below. + let amt_msat = 50_000_000; + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[1], amt_msat); + assert_eq!(route.paths.len(), 2); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1]], &[&nodes[1]]], amt_msat, payment_hash, + payment_secret, + ); + + // Move both channels into `AWAITING_REMOTE_REVOKE` by having nodes[0] send fee updates and + // withholding nodes[1]'s responding `commitment_signed`s. When nodes[1] later claims the + // payment, the fulfill updates cannot be sent immediately and instead sit in each channel's + // holding cell. + { + let mut fee_est = chanmon_cfgs[0].fee_estimator.sat_per_kw.lock().unwrap(); + *fee_est *= 2; + } + nodes[0].node.timer_tick_occurred(); + check_added_monitors(&nodes[0], 2); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + + let fee_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(fee_msgs.len(), 2); + for ev in &fee_msgs { + match ev { + MessageSendEvent::UpdateHTLCs { updates, .. } => { + nodes[1].node.handle_update_fee(node_0_id, updates.update_fee.as_ref().unwrap()); + nodes[1].node.handle_commitment_signed_batch_test( + node_0_id, &updates.commitment_signed, + ); + check_added_monitors(&nodes[1], 1); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // nodes[1] responds to each fee update with a `revoke_and_ack` and a new + // `commitment_signed`. Deliver only the `revoke_and_ack`s for now. The held + // `commitment_signed`s are delivered after nodes[1] claims the payment, creating the blocked + // post-claim monitor updates whose release is exercised after reload. + let node_1_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + let mut commitment_signed_msgs = Vec::new(); + for ev in &node_1_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[0].node.handle_revoke_and_ack(node_1_id, msg); + check_added_monitors(&nodes[0], 1); + }, + MessageSendEvent::UpdateHTLCs { updates, .. } => { + commitment_signed_msgs.push(updates.commitment_signed.clone()); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + for ev in &node_0_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[1].node.handle_revoke_and_ack(node_0_id, msg); + check_added_monitors(&nodes[1], 1); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // Snapshot channel B before the claim. The in-memory ChainMonitor applies updates even when + // the persister returns `InProgress`, so taking this snapshot after the claim would not model a + // crash between two separate monitor writes. + let mon_b_serialized = get_monitor!(nodes[1], chan_id_b).encode(); + + // Make both preimage monitor writes asynchronous. `claim_funds` attaches an in-memory MPP RAA + // blocker so neither channel can release later monitor updates until all channels have the + // preimage durably persisted. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[1], 2); + + // Complete only channel A's preimage update. Channel B will be reloaded from the stale snapshot + // above, simulating a crash where one monitor write completed and the other did not. + let (update_id_a, _) = get_latest_mon_update_id(&nodes[1], chan_id_a); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id_a, update_id_a); + + // Now finish the fee-update commitment dance we held back. nodes[1] receives nodes[0]'s + // `revoke_and_ack`s while the MPP RAA blocker is still in place, so the resulting monitor + // updates are blocked behind state that is not serialized in the ChannelManager. + for commitment_signed in &commitment_signed_msgs { + nodes[0].node.handle_commitment_signed_batch_test(node_1_id, commitment_signed); + check_added_monitors(&nodes[0], 1); + } + let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + for ev in &node_0_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[1].node.handle_revoke_and_ack(node_0_id, msg); + check_added_monitors(&nodes[1], 0); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // Persist the ChannelManager after the blocked post-claim monitor updates have been recorded. + // Reload with channel A's up-to-date monitor and channel B's stale monitor. The preimage update + // for B is replayed during reload, putting both channels' preimages on disk. The remaining state + // under test is the blocked post-claim `revoke_and_ack` monitor updates after the in-memory MPP + // RAA blocker that created them is gone. + let node_1_serialized = nodes[1].node.encode(); + let mon_a_serialized = get_monitor!(nodes[1], chan_id_a).encode(); + + nodes[0].node.peer_disconnected(node_1_id); + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_a_serialized, &mon_b_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized + ); + + // Reconnect both peers by manually exchanging `channel_reestablish`s. This avoids relying on a + // more general reconnect helper while the channels intentionally have asymmetric monitor state. + let node_1_id = nodes[1].node.get_our_node_id(); + nodes[0].node.peer_connected(node_1_id, &msgs::Init { + features: nodes[1].node.init_features(), networks: None, remote_network_address: None, + }, true).unwrap(); + nodes[1].node.peer_connected(node_0_id, &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None, + }, false).unwrap(); + + let reestablish_0 = nodes[0].node.get_and_clear_pending_msg_events(); + let reestablish_1 = nodes[1].node.get_and_clear_pending_msg_events(); + let mut reestablish_0_chan_ids = Vec::new(); + let mut reestablish_1_chan_ids = Vec::new(); + for ev in &reestablish_1 { + match ev { + MessageSendEvent::SendChannelReestablish { node_id, msg } => { + assert_eq!(*node_id, node_0_id); + reestablish_1_chan_ids.push(msg.channel_id); + nodes[0].node.handle_channel_reestablish(node_1_id, msg); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + for ev in &reestablish_0 { + match ev { + MessageSendEvent::SendChannelReestablish { node_id, msg } => { + assert_eq!(*node_id, node_1_id); + reestablish_0_chan_ids.push(msg.channel_id); + nodes[1].node.handle_channel_reestablish(node_0_id, msg); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + assert_eq!(reestablish_0_chan_ids.len(), 2); + assert!(reestablish_0_chan_ids.contains(&chan_id_a)); + assert!(reestablish_0_chan_ids.contains(&chan_id_b)); + assert_eq!(reestablish_1_chan_ids.len(), 2); + assert!(reestablish_1_chan_ids.contains(&chan_id_a)); + assert!(reestablish_1_chan_ids.contains(&chan_id_b)); + // Only nodes[1] was reloaded with stale monitor state. nodes[0] responds to the + // `channel_reestablish`s without touching its monitors. nodes[1] applies the replayed channel B + // preimage update, releases channel A's held RAA update, and frees channel A's held fulfill + // during startup processing. + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 3); + + // The first message batch after reconnect contains channel updates from both nodes. nodes[1] + // also sends the channel A fulfill that startup processing released from the holding cell. + let restart_msgs_0 = nodes[0].node.get_and_clear_pending_msg_events(); + let restart_msgs_1 = nodes[1].node.get_and_clear_pending_msg_events(); + let mut restart_scids_0 = Vec::new(); + let mut restart_scids_1 = Vec::new(); + let mut startup_fulfill_chan_ids = Vec::new(); + for ev in &restart_msgs_0 { + match ev { + MessageSendEvent::SendChannelUpdate { node_id, msg } => { + assert_eq!(*node_id, node_1_id); + restart_scids_0.push(msg.contents.short_channel_id); + }, + _ => panic!("Unexpected restart message from node 0: {:?}", ev), + } + } + for ev in &restart_msgs_1 { + match ev { + MessageSendEvent::SendChannelUpdate { node_id, msg } => { + assert_eq!(*node_id, node_0_id); + restart_scids_1.push(msg.contents.short_channel_id); + }, + MessageSendEvent::UpdateHTLCs { node_id, channel_id, updates } => { + assert_eq!(*node_id, node_0_id); + startup_fulfill_chan_ids.push(*channel_id); + assert_eq!(updates.update_fulfill_htlcs.len(), 1); + assert!(updates.update_add_htlcs.is_empty()); + assert!(updates.update_fail_htlcs.is_empty()); + assert!(updates.update_fail_malformed_htlcs.is_empty()); + assert!(updates.update_fee.is_none()); + for fulfill in &updates.update_fulfill_htlcs { + nodes[0].node.handle_update_fulfill_htlc(node_1_id, fulfill.clone()); + } + // Complete the standard commitment handshake for the released fulfill. The helper + // checks nodes[0]'s incoming commitment monitor update, nodes[1]'s response monitor + // updates, and nodes[0]'s held final monitor update. + do_commitment_signed_dance( + &nodes[0], &nodes[1], &updates.commitment_signed, false, false, + ); + }, + _ => panic!("Unexpected restart message from node 1: {:?}", ev), + } + } + assert_eq!(restart_scids_0.len(), 2); + assert!(restart_scids_0.contains(&scid_a)); + assert!(restart_scids_0.contains(&scid_b)); + assert_eq!(restart_scids_1.len(), 2); + assert!(restart_scids_1.contains(&scid_a)); + assert!(restart_scids_1.contains(&scid_b)); + assert_eq!(startup_fulfill_chan_ids, vec![chan_id_a]); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 0); + + // Receiving the startup-released fulfill gives nodes[0] the payment preimage. That is enough to + // emit `PaymentSent`, even though channel B's path-level success still needs its own fulfill. + let startup_payment_events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(startup_payment_events.len(), 2); + let mut saw_startup_payment_sent = false; + let mut startup_success_scids = Vec::new(); + for ev in &startup_payment_events { + match ev { + Event::PaymentSent { + payment_preimage: sent_preimage, + payment_hash: sent_hash, + amount_msat: sent_amount, + fee_paid_msat, + .. + } => { + assert_eq!(*sent_preimage, payment_preimage); + assert_eq!(*sent_hash, payment_hash); + assert_eq!(*sent_amount, Some(amt_msat)); + assert_eq!(*fee_paid_msat, Some(0)); + saw_startup_payment_sent = true; + }, + Event::PaymentPathSuccessful { payment_hash: Some(path_hash), path, .. } => { + assert_eq!(*path_hash, payment_hash); + assert_eq!(path.hops.len(), 1); + startup_success_scids.push(path.hops[0].short_channel_id); + }, + _ => panic!("Unexpected startup payment event: {:?}", ev), + } + } + assert!(saw_startup_payment_sent); + assert_eq!(startup_success_scids, vec![scid_a]); + + // Handling the claim event runs the event-completion action that releases the remaining + // RAA-blocked monitor update. The startup unblock path already released channel A, so channel B + // is the only fulfill that should be emitted here. + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 1); + match &claim_events[0] { + Event::PaymentClaimed { payment_hash: claimed_hash, amount_msat, htlcs, .. } => { + assert_eq!(*claimed_hash, payment_hash); + assert_eq!(*amount_msat, amt_msat); + assert_eq!(htlcs.len(), 2); + }, + _ => panic!("Unexpected event: {:?}", claim_events[0]), + } + // The `PaymentSent` event above releases the monitor update that nodes[0] held after the final + // channel A startup revocation. + check_added_monitors(&nodes[0], 1); + // Handling `PaymentClaimed` releases channel B's held revocation update and then the fulfill + // that was waiting behind it. + check_added_monitors(&nodes[1], 2); + + // Channel A's fulfill was already sent during startup. The `PaymentClaimed` completion action + // now frees channel B's held fulfill, and no other HTLC update should be bundled with it. + let fulfill_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(fulfill_msgs.len(), 1); + match &fulfill_msgs[0] { + MessageSendEvent::UpdateHTLCs { node_id, channel_id, updates } => { + assert_eq!(*node_id, node_0_id); + assert_eq!(*channel_id, chan_id_b); + assert_eq!(updates.update_fulfill_htlcs.len(), 1); + assert!(updates.update_add_htlcs.is_empty()); + assert!(updates.update_fail_htlcs.is_empty()); + assert!(updates.update_fail_malformed_htlcs.is_empty()); + assert!(updates.update_fee.is_none()); + for fulfill in &updates.update_fulfill_htlcs { + nodes[0].node.handle_update_fulfill_htlc(node_1_id, fulfill.clone()); + } + // Complete the same commitment handshake for channel B. Here nodes[0]'s final monitor + // update is persisted immediately because `PaymentSent` already ran for channel A. + do_commitment_signed_dance( + &nodes[0], &nodes[1], &updates.commitment_signed, false, false, + ); + }, + _ => panic!("Unexpected fulfill message: {:?}", fulfill_msgs[0]), + } + check_added_monitors(&nodes[1], 0); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + let final_payment_events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(final_payment_events.len(), 1); + match &final_payment_events[0] { + Event::PaymentPathSuccessful { payment_hash: Some(path_hash), path, .. } => { + assert_eq!(*path_hash, payment_hash); + assert_eq!(path.hops.len(), 1); + assert_eq!(path.hops[0].short_channel_id, scid_b); + }, + _ => panic!("Unexpected final payment event: {:?}", final_payment_events[0]), + } + check_added_monitors(&nodes[0], 0); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 0); + + // Both MPP parts should have been fulfilled back to nodes[0]. If either channel still has a + // pending outbound HTLC, its fulfill remained stuck in nodes[1]'s holding cell after reload. + let pending: Vec<_> = nodes[0].node.list_channels().iter() + .filter(|channel| channel.channel_id == chan_id_a || channel.channel_id == chan_id_b) + .filter(|channel| !channel.pending_outbound_htlcs.is_empty()) + .map(|channel| channel.channel_id) + .collect(); + assert!(pending.is_empty(), "HTLC fulfills remained stuck on channels {:?}", pending); +} + fn do_forwarded_payment_no_manager_persistence(use_cs_commitment: bool, claim_htlc: bool, use_intercept: bool) { if !use_cs_commitment { assert!(!claim_htlc); } // If we go to forward a payment, and the ChannelMonitor persistence completes, but the