From d71e323820910db51aa4e2cb544475493f181e47 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 7 May 2026 13:55:39 -0500 Subject: [PATCH 1/3] channelnotifier: add NotifyEarlyClosedChannelEvent Today NotifyClosedChannelEvent rebuilds its event by round-tripping through FetchClosedChannel, which forces the caller to have already persisted the close summary to the closed-channel bucket. The chain watcher needs to surface a CLOSED_CHANNEL event to RPC subscribers as soon as a coop close spend is first detected on chain, well before the close has reached the required confirmation depth at which the state machine would normally call MarkChannelClosed. In this commit, we add NotifyEarlyClosedChannelEvent, which dispatches a ClosedChannelEvent built from a caller-supplied summary directly through the subscribe server. The summary is expected to carry IsPending=true so subscribers can recognize that the close has not yet been finalized in the database. Two unit tests assert that the new path delivers the supplied summary verbatim and produces exactly one event per call. --- channelnotifier/channelnotifier.go | 17 +++++ channelnotifier/channelnotifier_test.go | 89 +++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e7b815b301a..9a159731545 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -186,6 +186,23 @@ func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) { } } +// NotifyEarlyClosedChannelEvent dispatches a ClosedChannelEvent built from the +// supplied close summary, without consulting the channel database. This is +// used by the chain watcher to insta-dispatch CLOSED_CHANNEL events to RPC +// subscribers as soon as a coop close is first detected on chain, before the +// async N-conf path has persisted the close in the database. The summary's +// IsPending field will typically be true at this point; callers should set it +// accordingly. +func (c *ChannelNotifier) NotifyEarlyClosedChannelEvent( + summary *channeldb.ChannelCloseSummary) { + + event := ClosedChannelEvent{CloseSummary: summary} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send early closed channel update: %v", + err) + } +} + // NotifyFullyResolvedChannelEvent notifies the channelEventNotifier goroutine // that a channel was fully resolved on chain. func (c *ChannelNotifier) NotifyFullyResolvedChannelEvent( diff --git a/channelnotifier/channelnotifier_test.go b/channelnotifier/channelnotifier_test.go index 5dbdb4a4579..233ff127995 100644 --- a/channelnotifier/channelnotifier_test.go +++ b/channelnotifier/channelnotifier_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/stretchr/testify/require" ) @@ -41,3 +43,90 @@ func TestChannelUpdateEvent(t *testing.T) { t.Fatalf("expected to receive channel update event") } } + +// TestNotifyEarlyClosedChannelEvent verifies that the early-dispatch path +// delivers exactly the supplied close summary to subscribers without +// consulting the channel database. This is the path used by the chain watcher +// at first conf to insta-dispatch CLOSED_CHANNEL events for cooperative +// closes, before the close summary is persisted. +func TestNotifyEarlyClosedChannelEvent(t *testing.T) { + t.Parallel() + + // Pass nil for chanDB; the early-dispatch path must not touch it. + ntfnServer := New(nil) + require.NoError(t, ntfnServer.Start()) + t.Cleanup(func() { + require.NoError(t, ntfnServer.Stop()) + }) + + sub, err := ntfnServer.SubscribeChannelEvents() + require.NoError(t, err) + t.Cleanup(sub.Cancel) + + // Build a close summary with IsPending=true to mirror what the chain + // watcher will hand in at first-conf detection. + chanPoint := wire.OutPoint{ + Hash: chainhash.Hash{0x01, 0x02, 0x03}, + Index: 4, + } + summary := &channeldb.ChannelCloseSummary{ + ChanPoint: chanPoint, + CloseType: channeldb.CooperativeClose, + IsPending: true, + } + + ntfnServer.NotifyEarlyClosedChannelEvent(summary) + + select { + case event := <-sub.Updates(): + closedEvent, ok := event.(ClosedChannelEvent) + require.True(t, ok, "expected ClosedChannelEvent, got %T", event) + require.NotNil(t, closedEvent.CloseSummary) + require.True(t, closedEvent.CloseSummary.IsPending, + "early dispatched summary must carry IsPending=true") + require.Equal(t, summary, closedEvent.CloseSummary, + "early dispatched summary must reach subscriber "+ + "verbatim") + + case <-time.After(time.Second): + t.Fatal("expected to receive early closed channel event") + } +} + +// TestNotifyEarlyClosedChannelEventSingleEvent guards against accidental +// re-dispatch: a single early-notify call must produce exactly one event, +// not two (e.g. a fan-out bug between the early and the legacy paths). +func TestNotifyEarlyClosedChannelEventSingleEvent(t *testing.T) { + t.Parallel() + + ntfnServer := New(nil) + require.NoError(t, ntfnServer.Start()) + t.Cleanup(func() { + require.NoError(t, ntfnServer.Stop()) + }) + + sub, err := ntfnServer.SubscribeChannelEvents() + require.NoError(t, err) + t.Cleanup(sub.Cancel) + + summary := &channeldb.ChannelCloseSummary{ + ChanPoint: wire.OutPoint{Index: 7}, + CloseType: channeldb.CooperativeClose, + IsPending: true, + } + ntfnServer.NotifyEarlyClosedChannelEvent(summary) + + // Drain the single expected event. + select { + case <-sub.Updates(): + case <-time.After(time.Second): + t.Fatal("expected to receive early closed channel event") + } + + // Any further read should not produce another event. + select { + case extra := <-sub.Updates(): + t.Fatalf("unexpected second event: %T", extra) + case <-time.After(50 * time.Millisecond): + } +} From 2ecbc538a86938bec3c66b8474f04f9f1889d507 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 7 May 2026 13:56:01 -0500 Subject: [PATCH 2/3] contractcourt+server: insta-dispatch CLOSED_CHANNEL on first conf PR #10331 introduced a multi-confirmation reorg-aware dispatch in the chain watcher. In production builds CloseConfsForCapacity is at least 3, so the chain watcher waits for three confirmations of a close tx before running dispatchCooperativeClose, MarkChannelClosed, and NotifyClosedChannel. Subscribers of the SubscribeChannelEvents stream that used to receive a CLOSED_CHANNEL event after a single confirmation in v0.20.1 stopped seeing the event entirely on shorter test cycles and were delayed by two extra blocks on longer ones. This is the regression alexbosworth reported on zero-conf channels. The intent behind the original change was to wait three confirmations under the hood for reorg safety while still dispatching a CLOSED_CHANNEL event to RPC subscribers immediately, matching the v0.20.1 surface. That insta-dispatch was wired into peer.WaitForChanToClose for the local CloseChannel response stream but was never extended to the channel-notifier path that drives SubscribeChannelEvents. In this commit, we wire a new optional notifyEarlyCoopClose callback into the chain watcher's processDetectedSpend. The first time a coop close spend is detected on chain, the chain watcher synthesizes a ChannelCloseSummary with IsPending=true and dispatches a CLOSED_CHANNEL event over the channel notifier, no DB round-trip required. The callback is plumbed through ChainArbitratorConfig .NotifyEarlyClosedChannel to the new ChannelNotifier.NotifyEarlyClosedChannelEvent. The summary builder shared with dispatchCooperativeClose is extracted into buildCoopCloseSummary so the early and post-N-conf paths produce equivalent payloads. A coopCloseEarlyDispatched flag on the chain watcher keeps the dispatch idempotent across blockbeat replays of the same spend, and the closeObserver clears it on negativeConfChan so a re-mined or replacement close after a deep reorg re-fires the preliminary event with its own summary. The early-dispatch call sits before the fast-path check so numConfs==1 also fires the early event through the same code path. Suppressing the duplicate notify at MarkChannelClosed time happens inline in the chain_arbitrator MarkChannelClosed callback: after CloseChannel succeeds, NotifyClosedChannel is fired only when the close type is not CooperativeClose. Force, breach, and abandon paths intentionally remain on the existing N-confirmation dispatch contract. --- contractcourt/chain_arbitrator.go | 55 ++++++++---- contractcourt/chain_watcher.go | 136 +++++++++++++++++++++++++----- server.go | 1 + 3 files changed, 156 insertions(+), 36 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index eac63cb32d1..9769bc649dc 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -165,6 +165,15 @@ type ChainArbitratorConfig struct { // will use to notify the ChannelNotifier about a newly closed channel. NotifyClosedChannel func(wire.OutPoint) + // NotifyEarlyClosedChannel is invoked by the chain watcher when a + // cooperative close spend is first detected on chain, before the close + // summary has been persisted to the closed-channel bucket. It allows + // the channel notifier to dispatch a CLOSED_CHANNEL event over RPC at + // the same depth it did before the multi-confirmation reorg-aware + // dispatch was introduced. The follow-up persist + state advance still + // waits for the full required confirmation count. + NotifyEarlyClosedChannel func(*channeldb.ChannelCloseSummary) + // NotifyFullyResolvedChannel is a function closure that the // ChainArbitrator will use to notify the ChannelNotifier about a newly // resolved channel. The main difference to NotifyClosedChannel is that @@ -451,7 +460,19 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, if err != nil { return err } - c.cfg.NotifyClosedChannel(summary.ChanPoint) + + // For cooperative closes the chain watcher already + // fired a preliminary CLOSED_CHANNEL event over the + // channel notifier when the spend was first detected + // on chain. Suppressing the notify here keeps the + // SubscribeChannelEvents stream emitting a single + // CLOSED_CHANNEL per close, matching the v0.20.1 + // surface. Force/breach closes still notify here + // since they don't take the early-dispatch path. + if summary.CloseType != channeldb.CooperativeClose { + c.cfg.NotifyClosedChannel(summary.ChanPoint) + } + return nil }, IsPendingClose: false, @@ -1145,11 +1166,12 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error chanPoint, retInfo, ) }, - extractStateNumHint: lnwallet.GetStateNumHint, - auxLeafStore: c.cfg.AuxLeafStore, - auxResolver: c.cfg.AuxResolver, - auxCloser: c.cfg.AuxCloser, - chanCloseConfs: c.cfg.ChannelCloseConfs, + extractStateNumHint: lnwallet.GetStateNumHint, + auxLeafStore: c.cfg.AuxLeafStore, + auxResolver: c.cfg.AuxResolver, + auxCloser: c.cfg.AuxCloser, + chanCloseConfs: c.cfg.ChannelCloseConfs, + notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel, }, ) if err != nil { @@ -1319,16 +1341,17 @@ func (c *ChainArbitrator) loadOpenChannels() error { chainWatcher, err := newChainWatcher( chainWatcherConfig{ - chanState: channel, - notifier: c.cfg.Notifier, - signer: c.cfg.Signer, - isOurAddr: c.cfg.IsOurAddress, - contractBreach: breachClosure, - extractStateNumHint: lnwallet.GetStateNumHint, - auxLeafStore: c.cfg.AuxLeafStore, - auxResolver: c.cfg.AuxResolver, - auxCloser: c.cfg.AuxCloser, - chanCloseConfs: c.cfg.ChannelCloseConfs, + chanState: channel, + notifier: c.cfg.Notifier, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + contractBreach: breachClosure, + extractStateNumHint: lnwallet.GetStateNumHint, + auxLeafStore: c.cfg.AuxLeafStore, + auxResolver: c.cfg.AuxResolver, + auxCloser: c.cfg.AuxCloser, + chanCloseConfs: c.cfg.ChannelCloseConfs, + notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel, }, ) if err != nil { diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index e45bb3dc99b..78f51a17700 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -279,6 +279,16 @@ type chainWatcherConfig struct { // the normal capacity-based scaling. This is only available in // dev/integration builds for testing purposes. chanCloseConfs fn.Option[uint32] + + // notifyEarlyCoopClose, if set, is invoked with a synthesized + // ChannelCloseSummary the first time a cooperative close spend is + // detected on chain. It dispatches a CLOSED_CHANNEL event over the + // channel notifier so RPC subscribers see the close at the same + // block depth they did before the multi-confirmation reorg-aware + // dispatch was introduced. The follow-up state transition (DB persist + // + state machine advance + FULLY_RESOLVED_CHANNEL) still waits for + // the full required confirmation depth via the existing async path. + notifyEarlyCoopClose func(*channeldb.ChannelCloseSummary) } // chainWatcher is a system that's assigned to every active channel. The duty @@ -329,6 +339,13 @@ type chainWatcher struct { // ensure that the outpoint+pkscript pair is confirmed before calling // `RegisterSpendNtfn`. fundingConfirmedNtfn *chainntnfs.ConfirmationEvent + + // coopCloseEarlyDispatched is set when we have already insta-dispatched + // a preliminary CLOSED_CHANNEL event for a coop close upon first spend + // detection. It is cleared on a deep reorg of the close so a re-mined + // or replacement close still re-fires the early event. Only mutated + // from the closeObserver goroutine, so no lock is needed. + coopCloseEarlyDispatched bool } // newChainWatcher returns a new instance of a chainWatcher for a channel given @@ -711,12 +728,24 @@ type spendProcessResult struct { // // For single-confirmation mode (numConfs == 1), it immediately dispatches the // close event and returns empty result. For multi-confirmation mode, it -// registers for confirmations and returns the new pending state. +// registers for confirmations and returns the new pending state. In the +// async path, a coop close also triggers an early CLOSED_CHANNEL event over +// the channel notifier so RPC subscribers see the close at the same depth +// they did before the multi-confirmation reorg-aware dispatch was introduced. func (c *chainWatcher) processDetectedSpend( spend *chainntnfs.SpendDetail, source string, currentPendingSpend *chainntnfs.SpendDetail, currentConfNtfn *chainntnfs.ConfirmationEvent) spendProcessResult { + // For coop close spends, fire a preliminary CLOSED_CHANNEL event over + // the channel notifier as soon as the spend is first detected. This + // runs in both the fast and async paths so SubscribeChannelEvents + // subscribers see the close at the same depth they did before the + // multi-confirmation reorg-aware dispatch was introduced. The + // suppression of the duplicate notify at MarkChannelClosed time is + // handled in chain_arbitrator.go via a CloseType check. + c.maybeDispatchEarlyCoopClose(spend) + // FAST PATH: Single confirmation mode dispatches immediately. if c.handleSpendDispatch(spend, source) { if currentConfNtfn != nil { @@ -746,7 +775,8 @@ func (c *chainWatcher) processDetectedSpend( } } - // Different spend detected. Cancel existing confNtfn. + // Different spend detected (e.g. an RBF replacement). Cancel + // the existing confNtfn so we can re-register for the new tx. log.Warnf("ChannelPoint(%v): detected different spend tx %v, "+ "replacing pending tx %v", c.cfg.chanState.FundingOutpoint, @@ -943,6 +973,11 @@ func (c *chainWatcher) closeObserver() { confNtfn = nil pendingSpend = nil + // Clear the early-dispatch flag so a re-mined or + // replacement coop close re-fires the preliminary + // CLOSED_CHANNEL event with its own close summary. + c.coopCloseEarlyDispatched = false + // Reset the close confirmation height since the spend // was reorged out. err := c.cfg.chanState.ResetCloseConfirmationHeight() @@ -1347,26 +1382,70 @@ func (c *chainWatcher) requiredConfsForSpend() uint32 { }) } -// dispatchCooperativeClose processed a detect cooperative channel closure. -// We'll use the spending transaction to locate our output within the -// transaction, then clean up the database state. We'll also dispatch a -// notification to all subscribers that the channel has been closed in this -// manner. -func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDetail) error { - broadcastTx := commitSpend.SpendingTx +// isCoopCloseSpend reports whether the supplied spending tx looks like a +// cooperative close. A coop close has a finalized input sequence number +// (either MaxTxInSequenceNum or MaxRBFSequence); regular commitment txns +// carry an obfuscated state hint in the sequence + locktime fields and +// won't match either constant. +func isCoopCloseSpend(spendingTx *wire.MsgTx) bool { + if len(spendingTx.TxIn) == 0 { + return false + } - log.Infof("Cooperative closure for ChannelPoint(%v): %v", - c.cfg.chanState.FundingOutpoint, - lnutils.SpewLogClosure(broadcastTx)) + switch spendingTx.TxIn[0].Sequence { + case wire.MaxTxInSequenceNum: + return true + case mempool.MaxRBFSequence: + return true + } + + return false +} + +// maybeDispatchEarlyCoopClose fires a preliminary CLOSED_CHANNEL event over +// the channel notifier the first time a coop close spend is detected on +// chain. It is a no-op if no early-dispatch callback was wired in, the spend +// is not a coop close, or an early dispatch has already happened for this +// close. The flag is cleared on a deep reorg of the close (in the closeObserver +// negativeConfChan handler) so a re-mined or replacement close re-fires. +func (c *chainWatcher) maybeDispatchEarlyCoopClose( + spend *chainntnfs.SpendDetail) { + + if c.coopCloseEarlyDispatched { + return + } + if c.cfg.notifyEarlyCoopClose == nil { + return + } + if !isCoopCloseSpend(spend.SpendingTx) { + return + } + + summary := c.buildCoopCloseSummary(spend) + + log.Infof("ChannelPoint(%v): dispatching early CLOSED_CHANNEL "+ + "event for coop close tx %v at height %d", + c.cfg.chanState.FundingOutpoint, spend.SpenderTxHash, + spend.SpendingHeight) + + c.cfg.notifyEarlyCoopClose(summary) + c.coopCloseEarlyDispatched = true +} - // If the input *is* final, then we'll check to see which output is - // ours. +// buildCoopCloseSummary constructs a ChannelCloseSummary for a cooperative +// close from the supplied spend detail. The summary is returned with +// IsPending=true; the channel arbitrator's MarkChannelClosed callback flips +// this to false after the close reaches the required confirmation depth. This +// helper is shared between the early insta-dispatch path (first conf, no DB +// persist) and the post-N-conf dispatch path so both surfaces produce +// equivalent summaries. +func (c *chainWatcher) buildCoopCloseSummary( + commitSpend *chainntnfs.SpendDetail) *channeldb.ChannelCloseSummary { + + broadcastTx := commitSpend.SpendingTx localAmt := c.toSelfAmount(broadcastTx) - // Once this is known, we'll mark the state as fully closed in the - // database. For cooperative closes, we wait for a confirmation depth - // determined by channel capacity before dispatching this event. - closeSummary := &channeldb.ChannelCloseSummary{ + summary := &channeldb.ChannelCloseSummary{ ChanPoint: c.cfg.chanState.FundingOutpoint, ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *commitSpend.SpenderTxHash, @@ -1388,9 +1467,26 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) } else { - closeSummary.LastChanSyncMsg = chanSync + summary.LastChanSyncMsg = chanSync } + return summary +} + +// dispatchCooperativeClose processed a detect cooperative channel closure. +// We'll use the spending transaction to locate our output within the +// transaction, then clean up the database state. We'll also dispatch a +// notification to all subscribers that the channel has been closed in this +// manner. +func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDetail) error { + broadcastTx := commitSpend.SpendingTx + + log.Infof("Cooperative closure for ChannelPoint(%v): %v", + c.cfg.chanState.FundingOutpoint, + lnutils.SpewLogClosure(broadcastTx)) + + closeSummary := c.buildCoopCloseSummary(commitSpend) + // Create a summary of all the information needed to handle the // cooperative closure. closeInfo := &CooperativeCloseInfo{ @@ -1399,7 +1495,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // If we have an aux closer, finalize the cooperative close now that // it's confirmed. - err = fn.MapOptionZ( + err := fn.MapOptionZ( c.cfg.auxCloser, func(aux AuxChanCloser) error { return c.finalizeCoopClose(aux, broadcastTx) }, diff --git a/server.go b/server.go index 83131c6f2b3..e5307b2416a 100644 --- a/server.go +++ b/server.go @@ -1404,6 +1404,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, Sweeper: s.sweeper, Registry: s.invoices, NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent, + NotifyEarlyClosedChannel: s.channelNotifier.NotifyEarlyClosedChannelEvent, NotifyFullyResolvedChannel: s.channelNotifier.NotifyFullyResolvedChannelEvent, OnionProcessor: s.sphinxPayment, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, From 82c17a532b6badc8c7d288403c56b880ca4f94e0 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 7 May 2026 13:56:21 -0500 Subject: [PATCH 3/3] contractcourt+itest: tests for coop close insta-dispatch In this commit, we add three focused unit tests in contractcourt plus an itest that exercises the regression end-to-end. The chain watcher harness gains an opt-in early-dispatch capture that records every notifyEarlyCoopClose invocation so tests can assert how many fired and what summaries they carried. On top of that: TestEarlyDispatchCoopClose verifies the headline behavior. An async-path coop close fires exactly one early dispatch with IsPending=true and the post-N-conf flow still produces the regular CooperativeCloseInfo downstream. TestEarlyDispatchForceCloseNotInvoked guards the carve-out: force closes never fire the early dispatch since their CLOSED_CHANNEL event timing is intentionally unchanged. TestEarlyDispatchReorgRefiresOnReReplacement nails down the reorg path. Once a deep reorg removes the close, the early-dispatch flag is cleared and the next coop close re-fires the early event with its own summary, so a subscriber observes each distinct close attempt. testZeroConfCoopCloseSubscribeEvents brings up a zero-conf channel between Alice and Bob with --dev.force-channel-close-confs=3 so the chain watcher takes the async multi-confirmation path. Alice subscribes to channel events, initiates a cooperative close, and the test asserts that CLOSED_CHANNEL fires after only one confirmation of the close tx (not after the full three) and that FULLY_RESOLVED_CHANNEL arrives once the close has reached three confirmations. A quiet-window assertion at the end verifies that exactly one CLOSED_CHANNEL event is delivered. If the suppression in MarkChannelClosed broke and let it re-fire NotifyClosedChannel at N confs, this assertion would catch the duplicate. --- .../chain_watcher_early_dispatch_test.go | 114 ++++++++ contractcourt/chain_watcher_test_harness.go | 120 +++++++-- itest/list_on_test.go | 4 + itest/lnd_zero_conf_close_event_test.go | 243 ++++++++++++++++++ 4 files changed, 466 insertions(+), 15 deletions(-) create mode 100644 contractcourt/chain_watcher_early_dispatch_test.go create mode 100644 itest/lnd_zero_conf_close_event_test.go diff --git a/contractcourt/chain_watcher_early_dispatch_test.go b/contractcourt/chain_watcher_early_dispatch_test.go new file mode 100644 index 00000000000..22f6fdcc4a8 --- /dev/null +++ b/contractcourt/chain_watcher_early_dispatch_test.go @@ -0,0 +1,114 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/stretchr/testify/require" +) + +// TestEarlyDispatchCoopClose verifies the headline behavior: when a +// cooperative close spend is first detected on chain in the async path +// (numConfs > 1), the chain watcher fires the early-notify callback exactly +// once with a summary that carries IsPending=true. The full N-conf flow +// still completes normally and produces the regular CooperativeCloseInfo +// downstream. +func TestEarlyDispatchCoopClose(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx := harness.createCoopCloseTx(5000) + + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // The early-dispatch callback must have fired exactly once with a + // preliminary close summary that identifies the right tx, channel + // point, and close type. + harness.waitForEarlyCoopClose(1, time.Second) + require.Equal(t, 1, harness.earlyCoopCloseCount(), + "exactly one early dispatch expected on first spend detection") + + earlySummary := harness.earlyCoopCloseAt(0) + require.True(t, earlySummary.IsPending, + "early dispatched summary must have IsPending=true") + require.Equal(t, channeldb.CooperativeClose, earlySummary.CloseType) + require.Equal(t, tx.TxHash(), earlySummary.ClosingTXID) + require.Equal(t, harness.aliceChannel.State().FundingOutpoint, + earlySummary.ChanPoint) + + // Drive the close to N confs so the regular post-N-conf dispatch + // path also completes; the resulting CooperativeCloseInfo must + // reference the same tx. + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, tx) +} + +// TestEarlyDispatchForceCloseNotInvoked verifies that force-close spends do +// NOT trigger the early-dispatch callback. Force-close paths intentionally +// stay on the N-confirmation dispatch contract; their CLOSED_CHANNEL event +// fires from the channel arbitrator's MarkChannelClosed callback at N +// confs. +func TestEarlyDispatchForceCloseNotInvoked(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx := harness.createRemoteForceCloseTx() + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // Wait briefly so any spurious early dispatch would have landed. + time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, harness.earlyCoopCloseCount(), + "remote force close must not trigger early dispatch") +} + +// TestEarlyDispatchReorgRefiresOnReReplacement verifies the reorg recovery +// path: once a deep reorg removes the close, the early-dispatch flag is +// cleared, and the next coop close re-fires the early event with its own +// summary. This is the contract that lets a subscriber observe each +// distinct close attempt rather than only the first one. +func TestEarlyDispatchReorgRefiresOnReReplacement(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // First close detected → early dispatch #1. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + harness.waitForEarlyCoopClose(1, time.Second) + + // Reorg flushes the conf ntfn and resets the flag. + harness.triggerReorg(tx1, 2) + harness.waitForSpendRegistration() + + // Replacement close detected → early dispatch #2 with the new tx. + harness.sendSpend(tx2) + harness.waitForConfRegistration() + harness.waitForEarlyCoopClose(2, 2*time.Second) + + require.Equal(t, 2, harness.earlyCoopCloseCount(), + "reorg + replacement close must re-fire the early dispatch") + + first := harness.earlyCoopCloseAt(0) + second := harness.earlyCoopCloseAt(1) + require.Equal(t, tx1.TxHash(), first.ClosingTXID, + "first early dispatch must reference tx1") + require.Equal(t, tx2.TxHash(), second.ClosingTXID, + "second early dispatch must reference the replacement tx2") +} diff --git a/contractcourt/chain_watcher_test_harness.go b/contractcourt/chain_watcher_test_harness.go index 09ab0359249..e67b84abc95 100644 --- a/contractcourt/chain_watcher_test_harness.go +++ b/contractcourt/chain_watcher_test_harness.go @@ -1,6 +1,7 @@ package contractcourt import ( + "sync" "testing" "time" @@ -46,6 +47,14 @@ type chainWatcherTestHarness struct { // blockbeatProcessed is a channel that signals when a blockbeat has // been processed. blockbeatProcessed chan struct{} + + // earlyCoopCloseMu guards earlyCoopCloseSummaries. + earlyCoopCloseMu sync.Mutex + + // earlyCoopCloseSummaries records every invocation of the + // notifyEarlyCoopClose callback when captureEarlyCoopClose was + // enabled. Tests assert against length and contents. + earlyCoopCloseSummaries []*channeldb.ChannelCloseSummary } // mockChainNotifier extends the standard mock with additional channels for @@ -136,6 +145,11 @@ type harnessOpt func(*harnessConfig) // harnessConfig holds configuration for the test harness. type harnessConfig struct { requiredConfs fn.Option[uint32] + + // captureEarlyCoopClose, when true, wires a notifyEarlyCoopClose + // callback into the chain watcher that records each invocation onto + // the harness's earlyCoopCloseSummaries slice for assertions. + captureEarlyCoopClose bool } // withRequiredConfs sets the number of confirmations required for channel @@ -146,6 +160,15 @@ func withRequiredConfs(confs uint32) harnessOpt { } } +// withEarlyCoopCloseCapture enables recording of every invocation of the +// chain watcher's notifyEarlyCoopClose callback on the harness so tests can +// assert when, how often, and with which summary the early-dispatch fires. +func withEarlyCoopCloseCapture() harnessOpt { + return func(cfg *harnessConfig) { + cfg.captureEarlyCoopClose = true + } +} + // newChainWatcherTestHarness creates a new test harness for chain watcher // tests. func newChainWatcherTestHarness(t *testing.T, @@ -194,13 +217,38 @@ func newChainWatcherTestHarnessFromReporter(t *testing.T, spendRegistered: make(chan struct{}, 10), } + harness := &chainWatcherTestHarness{ + t: reporter, + aliceChannel: aliceChannel, + bobChannel: bobChannel, + notifier: notifier, + currentHeight: 100, + blockbeatProcessed: make(chan struct{}), + } + + // If the test wants to observe early-dispatch invocations, install + // a callback that records each summary onto the harness. + var notifyEarlyCoopClose func(*channeldb.ChannelCloseSummary) + if cfg.captureEarlyCoopClose { + notifyEarlyCoopClose = func( + s *channeldb.ChannelCloseSummary) { + + harness.earlyCoopCloseMu.Lock() + harness.earlyCoopCloseSummaries = append( + harness.earlyCoopCloseSummaries, s, + ) + harness.earlyCoopCloseMu.Unlock() + } + } + // Create chain watcher. chainWatcher, err := newChainWatcher(chainWatcherConfig{ - chanState: aliceChannel.State(), - notifier: notifier, - signer: aliceChannel.Signer, - extractStateNumHint: lnwallet.GetStateNumHint, - chanCloseConfs: cfg.requiredConfs, + chanState: aliceChannel.State(), + notifier: notifier, + signer: aliceChannel.Signer, + extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: cfg.requiredConfs, + notifyEarlyCoopClose: notifyEarlyCoopClose, contractBreach: func( retInfo *lnwallet.BreachRetribution, ) error { @@ -222,16 +270,8 @@ func newChainWatcherTestHarnessFromReporter(t *testing.T, // Subscribe to channel events. chanEvents := chainWatcher.SubscribeChannelEvents() - harness := &chainWatcherTestHarness{ - t: reporter, - aliceChannel: aliceChannel, - bobChannel: bobChannel, - chainWatcher: chainWatcher, - notifier: notifier, - chanEvents: chanEvents, - currentHeight: 100, - blockbeatProcessed: make(chan struct{}), - } + harness.chainWatcher = chainWatcher + harness.chanEvents = chanEvents // Wait for the initial spend registration that happens in Start(). harness.waitForSpendRegistration() @@ -407,6 +447,56 @@ func (h *chainWatcherTestHarness) mineBlocks(n int32) { h.currentHeight += n } +// earlyCoopCloseCount returns how many times the early-dispatch callback has +// fired since the harness started. +func (h *chainWatcherTestHarness) earlyCoopCloseCount() int { + h.earlyCoopCloseMu.Lock() + defer h.earlyCoopCloseMu.Unlock() + + return len(h.earlyCoopCloseSummaries) +} + +// earlyCoopCloseAt returns the early-dispatch summary recorded at the given +// index. The harness fails the test if the index is out of range. +func (h *chainWatcherTestHarness) earlyCoopCloseAt( + idx int) *channeldb.ChannelCloseSummary { + + h.earlyCoopCloseMu.Lock() + defer h.earlyCoopCloseMu.Unlock() + + if idx >= len(h.earlyCoopCloseSummaries) { + h.t.Fatalf("expected early-dispatch index %d, only %d "+ + "summaries recorded", idx, + len(h.earlyCoopCloseSummaries)) + } + + return h.earlyCoopCloseSummaries[idx] +} + +// waitForEarlyCoopClose blocks until at least the supplied count of +// early-dispatch invocations have been recorded, or the timeout elapses. +func (h *chainWatcherTestHarness) waitForEarlyCoopClose(want int, + timeout time.Duration) { + + h.t.Helper() + + deadline := time.Now().Add(timeout) + for { + if h.earlyCoopCloseCount() >= want { + return + } + if time.Now().After(deadline) { + h.t.Fatalf("expected %d early-dispatch invocations, "+ + "got %d after %v", want, + h.earlyCoopCloseCount(), timeout) + + return + } + + time.Sleep(10 * time.Millisecond) + } +} + // waitForCoopClose waits for a cooperative close event and returns it. func (h *chainWatcherTestHarness) waitForCoopClose( timeout time.Duration) *CooperativeCloseInfo { diff --git a/itest/list_on_test.go b/itest/list_on_test.go index c8e4244e7e3..5b9a7521fbd 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -463,6 +463,10 @@ var allTestCases = []*lntest.TestCase{ Name: "zero conf channel open", TestFunc: testZeroConfChannelOpen, }, + { + Name: "zero conf coop close subscribe events", + TestFunc: testZeroConfCoopCloseSubscribeEvents, + }, { Name: "option scid alias", TestFunc: testOptionScidAlias, diff --git a/itest/lnd_zero_conf_close_event_test.go b/itest/lnd_zero_conf_close_event_test.go new file mode 100644 index 00000000000..fa824b8d31f --- /dev/null +++ b/itest/lnd_zero_conf_close_event_test.go @@ -0,0 +1,243 @@ +package itest + +import ( + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lntest/rpc" + "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/stretchr/testify/require" +) + +// testZeroConfCoopCloseSubscribeEvents exercises the regression that was +// reported when production builds switched to a multi-confirmation +// reorg-aware close dispatch: SubscribeChannelEvents stopped emitting +// CLOSED_CHANNEL on cooperative closes for zero-conf channels until the +// close had reached the full confirmation depth, instead of firing at first +// detection like v0.20.1. +// +// The fix wires an early-dispatch callback into the chain watcher that fires +// a preliminary CLOSED_CHANNEL event over the channel notifier as soon as a +// coop close spend lands on chain. This test asserts: +// +// 1. CLOSED_CHANNEL fires on the SubscribeChannelEvents stream after only +// the first confirmation of the close tx (not after the full N=3 +// depth required by --dev.force-channel-close-confs=3). +// 2. FULLY_RESOLVED_CHANNEL fires once the close has reached N confs and +// the channel arbitrator has finished its resolution flow. +// 3. Exactly one CLOSED_CHANNEL is delivered — the suppression logic in +// the channel arbitrator drops the duplicate that would otherwise fire +// from MarkChannelClosed at N confs. +func testZeroConfCoopCloseSubscribeEvents(ht *lntest.HarnessTest) { + // Force coop close to require 3 confs so we exercise the async path + // in the chain watcher (the same path production hits via + // CloseConfsForCapacity). + const requiredConfs = 3 + + // Zero-conf channels need option-scid-alias and anchors; force-confs + // is what flips us out of the numConfs==1 fast-path so we can verify + // the early dispatch is what surfaces the CLOSED_CHANNEL event. + nodeArgs := []string{ + "--protocol.option-scid-alias", + "--protocol.zero-conf", + "--protocol.anchors", + "--dev.force-channel-close-confs=3", + } + + alice := ht.NewNode("Alice", nodeArgs) + bob := ht.NewNode("Bob", nodeArgs) + + ht.FundCoins(btcutil.SatoshiPerBitcoin, alice) + ht.EnsureConnected(alice, bob) + + // A channel acceptor on Bob is needed to allow the zero-conf + // negotiation to succeed. + acceptStream, cancelAcceptor := bob.RPC.ChannelAcceptor() + go acceptChannel(ht.T, true, acceptStream) + + const chanAmt = btcutil.Amount(1_000_000) + openParams := lntest.OpenChannelParams{ + Amt: chanAmt, + Private: true, + CommitmentType: lnrpc.CommitmentType_ANCHORS, + ZeroConf: true, + } + stream := ht.OpenChannelAssertStream(alice, bob, openParams) + cancelAcceptor() + + // Wait for the channel-open update — for zero-conf this arrives + // without any blocks needing to be mined. + chanPoint := ht.WaitForChannelOpenEvent(stream) + ht.AssertChannelInGraph(alice, chanPoint) + ht.AssertChannelInGraph(bob, chanPoint) + + // Subscribe Alice to channel events BEFORE we initiate the close so + // we capture the full close lifecycle on the wire. + chanSub := alice.RPC.SubscribeChannelEvents() + + // Alice initiates the cooperative close; NoWait so the closing tx + // just lands in the mempool. + closeStream, _ := ht.CloseChannelAssertPending(alice, chanPoint, false) + + // Mine a single block so the close tx confirms once. With the fix in + // place, the chain watcher's processDetectedSpend should + // insta-dispatch CLOSED_CHANNEL over the notifier as soon as the + // spend lands, even though the async path is still waiting on two + // more confs before driving MarkChannelClosed. + ht.MineBlocksAndAssertNumTxes(1, 1) + + closedSeen := waitForChannelEventOfType( + ht, chanSub, + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL, + ) + require.NotNil( + ht, closedSeen, + "CLOSED_CHANNEL must fire after the first conf of the "+ + "close tx (regression: production was waiting for "+ + "the full 3-conf depth)", + ) + + // The CLOSED_CHANNEL summary must reflect a cooperative close + // initiated by the local node. + closedSummary := closedSeen.GetClosedChannel() + require.NotNil(ht, closedSummary, + "CLOSED_CHANNEL update must carry a close summary") + require.Equal(ht, + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE, + closedSummary.CloseType, + ) + require.Equal(ht, + lnrpc.Initiator_INITIATOR_LOCAL, + closedSummary.CloseInitiator, + ) + + // Mine the remaining confs needed to take the close to its final + // resolution state. + ht.MineBlocksAndAssertNumTxes(requiredConfs-1, 0) + + // Drain the close-channel client stream so the test cleanup path + // doesn't hang on it. + go func() { + for { + if _, err := closeStream.Recv(); err != nil { + return + } + } + }() + + // FULLY_RESOLVED_CHANNEL must arrive after the close advances + // through the channel arbitrator at full N confs. + resolvedSeen := waitForChannelEventOfType( + ht, chanSub, + lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL, + ) + require.NotNil(ht, resolvedSeen, + "FULLY_RESOLVED_CHANNEL must fire after the close reaches "+ + "the required confirmation depth") + + // Crucially: no second CLOSED_CHANNEL event should have arrived + // between the early one and the FULLY_RESOLVED_CHANNEL. The + // channel-arbitrator-side suppression drops the duplicate that + // MarkChannelClosed would otherwise emit. We give the stream a brief + // quiet window to surface any straggling events. + assertNoMoreClosedEvents(ht, chanSub, 500*time.Millisecond) +} + +// waitForChannelEventOfType drains the channel events subscription until +// one of the supplied type lands or the harness's default timeout elapses. +// Other event types (PENDING_OPEN, OPEN, ACTIVE, INACTIVE, CHANNEL_UPDATE) +// are expected during a normal close flow and must be tolerated rather than +// fail the test. +func waitForChannelEventOfType(ht *lntest.HarnessTest, + sub rpc.ChannelEventsClient, + want lnrpc.ChannelEventUpdate_UpdateType) *lnrpc.ChannelEventUpdate { + + type result struct { + event *lnrpc.ChannelEventUpdate + err error + } + + results := make(chan result, 1) + deadline := time.After(wait.DefaultTimeout) + + go func() { + for { + ev, err := sub.Recv() + if err != nil { + results <- result{err: err} + return + } + if ev.Type == want { + results <- result{event: ev} + return + } + } + }() + + select { + case r := <-results: + require.NoErrorf(ht, r.err, + "error from channel event stream while waiting "+ + "for %v", want) + + return r.event + + case <-deadline: + ht.Fatalf("timed out waiting for channel event %v", want) + return nil + } +} + +// assertNoMoreClosedEvents reads from the subscription for the supplied +// quiet window and fails the test if a CLOSED_CHANNEL event is observed. +// FULLY_RESOLVED_CHANNEL has already been observed by this point so a +// second CLOSED_CHANNEL would represent the duplicate-notify regression +// that the suppression logic is meant to prevent. +func assertNoMoreClosedEvents(ht *lntest.HarnessTest, + sub rpc.ChannelEventsClient, window time.Duration) { + + done := make(chan struct{}) + defer close(done) + + errs := make(chan error, 1) + dups := make(chan *lnrpc.ChannelEventUpdate, 1) + + go func() { + for { + ev, err := sub.Recv() + if err != nil { + select { + case errs <- err: + case <-done: + } + return + } + if ev.Type == + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL { + + select { + case dups <- ev: + case <-done: + } + return + } + } + }() + + select { + case dup := <-dups: + ht.Fatalf("unexpected duplicate CLOSED_CHANNEL event: %v", + dup) + + case err := <-errs: + // Stream EOF or context cancel is fine; we just want the + // quiet window to elapse without a duplicate. + _ = err + + case <-time.After(window): + // Quiet window elapsed without a duplicate. Pass. + } +} +