diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e7b815b301a..9a159731545 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -186,6 +186,23 @@ func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) { } } +// NotifyEarlyClosedChannelEvent dispatches a ClosedChannelEvent built from the +// supplied close summary, without consulting the channel database. This is +// used by the chain watcher to insta-dispatch CLOSED_CHANNEL events to RPC +// subscribers as soon as a coop close is first detected on chain, before the +// async N-conf path has persisted the close in the database. The summary's +// IsPending field will typically be true at this point; callers should set it +// accordingly. +func (c *ChannelNotifier) NotifyEarlyClosedChannelEvent( + summary *channeldb.ChannelCloseSummary) { + + event := ClosedChannelEvent{CloseSummary: summary} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send early closed channel update: %v", + err) + } +} + // NotifyFullyResolvedChannelEvent notifies the channelEventNotifier goroutine // that a channel was fully resolved on chain. func (c *ChannelNotifier) NotifyFullyResolvedChannelEvent( diff --git a/channelnotifier/channelnotifier_test.go b/channelnotifier/channelnotifier_test.go index 5dbdb4a4579..233ff127995 100644 --- a/channelnotifier/channelnotifier_test.go +++ b/channelnotifier/channelnotifier_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/stretchr/testify/require" ) @@ -41,3 +43,90 @@ func TestChannelUpdateEvent(t *testing.T) { t.Fatalf("expected to receive channel update event") } } + +// TestNotifyEarlyClosedChannelEvent verifies that the early-dispatch path +// delivers exactly the supplied close summary to subscribers without +// consulting the channel database. This is the path used by the chain watcher +// at first conf to insta-dispatch CLOSED_CHANNEL events for cooperative +// closes, before the close summary is persisted. +func TestNotifyEarlyClosedChannelEvent(t *testing.T) { + t.Parallel() + + // Pass nil for chanDB; the early-dispatch path must not touch it. + ntfnServer := New(nil) + require.NoError(t, ntfnServer.Start()) + t.Cleanup(func() { + require.NoError(t, ntfnServer.Stop()) + }) + + sub, err := ntfnServer.SubscribeChannelEvents() + require.NoError(t, err) + t.Cleanup(sub.Cancel) + + // Build a close summary with IsPending=true to mirror what the chain + // watcher will hand in at first-conf detection. + chanPoint := wire.OutPoint{ + Hash: chainhash.Hash{0x01, 0x02, 0x03}, + Index: 4, + } + summary := &channeldb.ChannelCloseSummary{ + ChanPoint: chanPoint, + CloseType: channeldb.CooperativeClose, + IsPending: true, + } + + ntfnServer.NotifyEarlyClosedChannelEvent(summary) + + select { + case event := <-sub.Updates(): + closedEvent, ok := event.(ClosedChannelEvent) + require.True(t, ok, "expected ClosedChannelEvent, got %T", event) + require.NotNil(t, closedEvent.CloseSummary) + require.True(t, closedEvent.CloseSummary.IsPending, + "early dispatched summary must carry IsPending=true") + require.Equal(t, summary, closedEvent.CloseSummary, + "early dispatched summary must reach subscriber "+ + "verbatim") + + case <-time.After(time.Second): + t.Fatal("expected to receive early closed channel event") + } +} + +// TestNotifyEarlyClosedChannelEventSingleEvent guards against accidental +// re-dispatch: a single early-notify call must produce exactly one event, +// not two (e.g. a fan-out bug between the early and the legacy paths). +func TestNotifyEarlyClosedChannelEventSingleEvent(t *testing.T) { + t.Parallel() + + ntfnServer := New(nil) + require.NoError(t, ntfnServer.Start()) + t.Cleanup(func() { + require.NoError(t, ntfnServer.Stop()) + }) + + sub, err := ntfnServer.SubscribeChannelEvents() + require.NoError(t, err) + t.Cleanup(sub.Cancel) + + summary := &channeldb.ChannelCloseSummary{ + ChanPoint: wire.OutPoint{Index: 7}, + CloseType: channeldb.CooperativeClose, + IsPending: true, + } + ntfnServer.NotifyEarlyClosedChannelEvent(summary) + + // Drain the single expected event. + select { + case <-sub.Updates(): + case <-time.After(time.Second): + t.Fatal("expected to receive early closed channel event") + } + + // Any further read should not produce another event. + select { + case extra := <-sub.Updates(): + t.Fatalf("unexpected second event: %T", extra) + case <-time.After(50 * time.Millisecond): + } +} diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index eac63cb32d1..9769bc649dc 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -165,6 +165,15 @@ type ChainArbitratorConfig struct { // will use to notify the ChannelNotifier about a newly closed channel. NotifyClosedChannel func(wire.OutPoint) + // NotifyEarlyClosedChannel is invoked by the chain watcher when a + // cooperative close spend is first detected on chain, before the close + // summary has been persisted to the closed-channel bucket. It allows + // the channel notifier to dispatch a CLOSED_CHANNEL event over RPC at + // the same depth it did before the multi-confirmation reorg-aware + // dispatch was introduced. The follow-up persist + state advance still + // waits for the full required confirmation count. + NotifyEarlyClosedChannel func(*channeldb.ChannelCloseSummary) + // NotifyFullyResolvedChannel is a function closure that the // ChainArbitrator will use to notify the ChannelNotifier about a newly // resolved channel. The main difference to NotifyClosedChannel is that @@ -451,7 +460,19 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, if err != nil { return err } - c.cfg.NotifyClosedChannel(summary.ChanPoint) + + // For cooperative closes the chain watcher already + // fired a preliminary CLOSED_CHANNEL event over the + // channel notifier when the spend was first detected + // on chain. Suppressing the notify here keeps the + // SubscribeChannelEvents stream emitting a single + // CLOSED_CHANNEL per close, matching the v0.20.1 + // surface. Force/breach closes still notify here + // since they don't take the early-dispatch path. + if summary.CloseType != channeldb.CooperativeClose { + c.cfg.NotifyClosedChannel(summary.ChanPoint) + } + return nil }, IsPendingClose: false, @@ -1145,11 +1166,12 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error chanPoint, retInfo, ) }, - extractStateNumHint: lnwallet.GetStateNumHint, - auxLeafStore: c.cfg.AuxLeafStore, - auxResolver: c.cfg.AuxResolver, - auxCloser: c.cfg.AuxCloser, - chanCloseConfs: c.cfg.ChannelCloseConfs, + extractStateNumHint: lnwallet.GetStateNumHint, + auxLeafStore: c.cfg.AuxLeafStore, + auxResolver: c.cfg.AuxResolver, + auxCloser: c.cfg.AuxCloser, + chanCloseConfs: c.cfg.ChannelCloseConfs, + notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel, }, ) if err != nil { @@ -1319,16 +1341,17 @@ func (c *ChainArbitrator) loadOpenChannels() error { chainWatcher, err := newChainWatcher( chainWatcherConfig{ - chanState: channel, - notifier: c.cfg.Notifier, - signer: c.cfg.Signer, - isOurAddr: c.cfg.IsOurAddress, - contractBreach: breachClosure, - extractStateNumHint: lnwallet.GetStateNumHint, - auxLeafStore: c.cfg.AuxLeafStore, - auxResolver: c.cfg.AuxResolver, - auxCloser: c.cfg.AuxCloser, - chanCloseConfs: c.cfg.ChannelCloseConfs, + chanState: channel, + notifier: c.cfg.Notifier, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + contractBreach: breachClosure, + extractStateNumHint: lnwallet.GetStateNumHint, + auxLeafStore: c.cfg.AuxLeafStore, + auxResolver: c.cfg.AuxResolver, + auxCloser: c.cfg.AuxCloser, + chanCloseConfs: c.cfg.ChannelCloseConfs, + notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel, }, ) if err != nil { diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index e45bb3dc99b..78f51a17700 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -279,6 +279,16 @@ type chainWatcherConfig struct { // the normal capacity-based scaling. This is only available in // dev/integration builds for testing purposes. chanCloseConfs fn.Option[uint32] + + // notifyEarlyCoopClose, if set, is invoked with a synthesized + // ChannelCloseSummary the first time a cooperative close spend is + // detected on chain. It dispatches a CLOSED_CHANNEL event over the + // channel notifier so RPC subscribers see the close at the same + // block depth they did before the multi-confirmation reorg-aware + // dispatch was introduced. The follow-up state transition (DB persist + // + state machine advance + FULLY_RESOLVED_CHANNEL) still waits for + // the full required confirmation depth via the existing async path. + notifyEarlyCoopClose func(*channeldb.ChannelCloseSummary) } // chainWatcher is a system that's assigned to every active channel. The duty @@ -329,6 +339,13 @@ type chainWatcher struct { // ensure that the outpoint+pkscript pair is confirmed before calling // `RegisterSpendNtfn`. fundingConfirmedNtfn *chainntnfs.ConfirmationEvent + + // coopCloseEarlyDispatched is set when we have already insta-dispatched + // a preliminary CLOSED_CHANNEL event for a coop close upon first spend + // detection. It is cleared on a deep reorg of the close so a re-mined + // or replacement close still re-fires the early event. Only mutated + // from the closeObserver goroutine, so no lock is needed. + coopCloseEarlyDispatched bool } // newChainWatcher returns a new instance of a chainWatcher for a channel given @@ -711,12 +728,24 @@ type spendProcessResult struct { // // For single-confirmation mode (numConfs == 1), it immediately dispatches the // close event and returns empty result. For multi-confirmation mode, it -// registers for confirmations and returns the new pending state. +// registers for confirmations and returns the new pending state. In the +// async path, a coop close also triggers an early CLOSED_CHANNEL event over +// the channel notifier so RPC subscribers see the close at the same depth +// they did before the multi-confirmation reorg-aware dispatch was introduced. func (c *chainWatcher) processDetectedSpend( spend *chainntnfs.SpendDetail, source string, currentPendingSpend *chainntnfs.SpendDetail, currentConfNtfn *chainntnfs.ConfirmationEvent) spendProcessResult { + // For coop close spends, fire a preliminary CLOSED_CHANNEL event over + // the channel notifier as soon as the spend is first detected. This + // runs in both the fast and async paths so SubscribeChannelEvents + // subscribers see the close at the same depth they did before the + // multi-confirmation reorg-aware dispatch was introduced. The + // suppression of the duplicate notify at MarkChannelClosed time is + // handled in chain_arbitrator.go via a CloseType check. + c.maybeDispatchEarlyCoopClose(spend) + // FAST PATH: Single confirmation mode dispatches immediately. if c.handleSpendDispatch(spend, source) { if currentConfNtfn != nil { @@ -746,7 +775,8 @@ func (c *chainWatcher) processDetectedSpend( } } - // Different spend detected. Cancel existing confNtfn. + // Different spend detected (e.g. an RBF replacement). Cancel + // the existing confNtfn so we can re-register for the new tx. log.Warnf("ChannelPoint(%v): detected different spend tx %v, "+ "replacing pending tx %v", c.cfg.chanState.FundingOutpoint, @@ -943,6 +973,11 @@ func (c *chainWatcher) closeObserver() { confNtfn = nil pendingSpend = nil + // Clear the early-dispatch flag so a re-mined or + // replacement coop close re-fires the preliminary + // CLOSED_CHANNEL event with its own close summary. + c.coopCloseEarlyDispatched = false + // Reset the close confirmation height since the spend // was reorged out. err := c.cfg.chanState.ResetCloseConfirmationHeight() @@ -1347,26 +1382,70 @@ func (c *chainWatcher) requiredConfsForSpend() uint32 { }) } -// dispatchCooperativeClose processed a detect cooperative channel closure. -// We'll use the spending transaction to locate our output within the -// transaction, then clean up the database state. We'll also dispatch a -// notification to all subscribers that the channel has been closed in this -// manner. -func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDetail) error { - broadcastTx := commitSpend.SpendingTx +// isCoopCloseSpend reports whether the supplied spending tx looks like a +// cooperative close. A coop close has a finalized input sequence number +// (either MaxTxInSequenceNum or MaxRBFSequence); regular commitment txns +// carry an obfuscated state hint in the sequence + locktime fields and +// won't match either constant. +func isCoopCloseSpend(spendingTx *wire.MsgTx) bool { + if len(spendingTx.TxIn) == 0 { + return false + } - log.Infof("Cooperative closure for ChannelPoint(%v): %v", - c.cfg.chanState.FundingOutpoint, - lnutils.SpewLogClosure(broadcastTx)) + switch spendingTx.TxIn[0].Sequence { + case wire.MaxTxInSequenceNum: + return true + case mempool.MaxRBFSequence: + return true + } + + return false +} + +// maybeDispatchEarlyCoopClose fires a preliminary CLOSED_CHANNEL event over +// the channel notifier the first time a coop close spend is detected on +// chain. It is a no-op if no early-dispatch callback was wired in, the spend +// is not a coop close, or an early dispatch has already happened for this +// close. The flag is cleared on a deep reorg of the close (in the closeObserver +// negativeConfChan handler) so a re-mined or replacement close re-fires. +func (c *chainWatcher) maybeDispatchEarlyCoopClose( + spend *chainntnfs.SpendDetail) { + + if c.coopCloseEarlyDispatched { + return + } + if c.cfg.notifyEarlyCoopClose == nil { + return + } + if !isCoopCloseSpend(spend.SpendingTx) { + return + } + + summary := c.buildCoopCloseSummary(spend) + + log.Infof("ChannelPoint(%v): dispatching early CLOSED_CHANNEL "+ + "event for coop close tx %v at height %d", + c.cfg.chanState.FundingOutpoint, spend.SpenderTxHash, + spend.SpendingHeight) + + c.cfg.notifyEarlyCoopClose(summary) + c.coopCloseEarlyDispatched = true +} - // If the input *is* final, then we'll check to see which output is - // ours. +// buildCoopCloseSummary constructs a ChannelCloseSummary for a cooperative +// close from the supplied spend detail. The summary is returned with +// IsPending=true; the channel arbitrator's MarkChannelClosed callback flips +// this to false after the close reaches the required confirmation depth. This +// helper is shared between the early insta-dispatch path (first conf, no DB +// persist) and the post-N-conf dispatch path so both surfaces produce +// equivalent summaries. +func (c *chainWatcher) buildCoopCloseSummary( + commitSpend *chainntnfs.SpendDetail) *channeldb.ChannelCloseSummary { + + broadcastTx := commitSpend.SpendingTx localAmt := c.toSelfAmount(broadcastTx) - // Once this is known, we'll mark the state as fully closed in the - // database. For cooperative closes, we wait for a confirmation depth - // determined by channel capacity before dispatching this event. - closeSummary := &channeldb.ChannelCloseSummary{ + summary := &channeldb.ChannelCloseSummary{ ChanPoint: c.cfg.chanState.FundingOutpoint, ChainHash: c.cfg.chanState.ChainHash, ClosingTXID: *commitSpend.SpenderTxHash, @@ -1388,9 +1467,26 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet log.Errorf("ChannelPoint(%v): unable to create channel sync "+ "message: %v", c.cfg.chanState.FundingOutpoint, err) } else { - closeSummary.LastChanSyncMsg = chanSync + summary.LastChanSyncMsg = chanSync } + return summary +} + +// dispatchCooperativeClose processed a detect cooperative channel closure. +// We'll use the spending transaction to locate our output within the +// transaction, then clean up the database state. We'll also dispatch a +// notification to all subscribers that the channel has been closed in this +// manner. +func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDetail) error { + broadcastTx := commitSpend.SpendingTx + + log.Infof("Cooperative closure for ChannelPoint(%v): %v", + c.cfg.chanState.FundingOutpoint, + lnutils.SpewLogClosure(broadcastTx)) + + closeSummary := c.buildCoopCloseSummary(commitSpend) + // Create a summary of all the information needed to handle the // cooperative closure. closeInfo := &CooperativeCloseInfo{ @@ -1399,7 +1495,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet // If we have an aux closer, finalize the cooperative close now that // it's confirmed. - err = fn.MapOptionZ( + err := fn.MapOptionZ( c.cfg.auxCloser, func(aux AuxChanCloser) error { return c.finalizeCoopClose(aux, broadcastTx) }, diff --git a/contractcourt/chain_watcher_early_dispatch_test.go b/contractcourt/chain_watcher_early_dispatch_test.go new file mode 100644 index 00000000000..22f6fdcc4a8 --- /dev/null +++ b/contractcourt/chain_watcher_early_dispatch_test.go @@ -0,0 +1,114 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/stretchr/testify/require" +) + +// TestEarlyDispatchCoopClose verifies the headline behavior: when a +// cooperative close spend is first detected on chain in the async path +// (numConfs > 1), the chain watcher fires the early-notify callback exactly +// once with a summary that carries IsPending=true. The full N-conf flow +// still completes normally and produces the regular CooperativeCloseInfo +// downstream. +func TestEarlyDispatchCoopClose(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx := harness.createCoopCloseTx(5000) + + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // The early-dispatch callback must have fired exactly once with a + // preliminary close summary that identifies the right tx, channel + // point, and close type. + harness.waitForEarlyCoopClose(1, time.Second) + require.Equal(t, 1, harness.earlyCoopCloseCount(), + "exactly one early dispatch expected on first spend detection") + + earlySummary := harness.earlyCoopCloseAt(0) + require.True(t, earlySummary.IsPending, + "early dispatched summary must have IsPending=true") + require.Equal(t, channeldb.CooperativeClose, earlySummary.CloseType) + require.Equal(t, tx.TxHash(), earlySummary.ClosingTXID) + require.Equal(t, harness.aliceChannel.State().FundingOutpoint, + earlySummary.ChanPoint) + + // Drive the close to N confs so the regular post-N-conf dispatch + // path also completes; the resulting CooperativeCloseInfo must + // reference the same tx. + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, tx) +} + +// TestEarlyDispatchForceCloseNotInvoked verifies that force-close spends do +// NOT trigger the early-dispatch callback. Force-close paths intentionally +// stay on the N-confirmation dispatch contract; their CLOSED_CHANNEL event +// fires from the channel arbitrator's MarkChannelClosed callback at N +// confs. +func TestEarlyDispatchForceCloseNotInvoked(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx := harness.createRemoteForceCloseTx() + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // Wait briefly so any spurious early dispatch would have landed. + time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, harness.earlyCoopCloseCount(), + "remote force close must not trigger early dispatch") +} + +// TestEarlyDispatchReorgRefiresOnReReplacement verifies the reorg recovery +// path: once a deep reorg removes the close, the early-dispatch flag is +// cleared, and the next coop close re-fires the early event with its own +// summary. This is the contract that lets a subscriber observe each +// distinct close attempt rather than only the first one. +func TestEarlyDispatchReorgRefiresOnReReplacement(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness( + t, withRequiredConfs(3), withEarlyCoopCloseCapture(), + ) + + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // First close detected → early dispatch #1. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + harness.waitForEarlyCoopClose(1, time.Second) + + // Reorg flushes the conf ntfn and resets the flag. + harness.triggerReorg(tx1, 2) + harness.waitForSpendRegistration() + + // Replacement close detected → early dispatch #2 with the new tx. + harness.sendSpend(tx2) + harness.waitForConfRegistration() + harness.waitForEarlyCoopClose(2, 2*time.Second) + + require.Equal(t, 2, harness.earlyCoopCloseCount(), + "reorg + replacement close must re-fire the early dispatch") + + first := harness.earlyCoopCloseAt(0) + second := harness.earlyCoopCloseAt(1) + require.Equal(t, tx1.TxHash(), first.ClosingTXID, + "first early dispatch must reference tx1") + require.Equal(t, tx2.TxHash(), second.ClosingTXID, + "second early dispatch must reference the replacement tx2") +} diff --git a/contractcourt/chain_watcher_test_harness.go b/contractcourt/chain_watcher_test_harness.go index 09ab0359249..e67b84abc95 100644 --- a/contractcourt/chain_watcher_test_harness.go +++ b/contractcourt/chain_watcher_test_harness.go @@ -1,6 +1,7 @@ package contractcourt import ( + "sync" "testing" "time" @@ -46,6 +47,14 @@ type chainWatcherTestHarness struct { // blockbeatProcessed is a channel that signals when a blockbeat has // been processed. blockbeatProcessed chan struct{} + + // earlyCoopCloseMu guards earlyCoopCloseSummaries. + earlyCoopCloseMu sync.Mutex + + // earlyCoopCloseSummaries records every invocation of the + // notifyEarlyCoopClose callback when captureEarlyCoopClose was + // enabled. Tests assert against length and contents. + earlyCoopCloseSummaries []*channeldb.ChannelCloseSummary } // mockChainNotifier extends the standard mock with additional channels for @@ -136,6 +145,11 @@ type harnessOpt func(*harnessConfig) // harnessConfig holds configuration for the test harness. type harnessConfig struct { requiredConfs fn.Option[uint32] + + // captureEarlyCoopClose, when true, wires a notifyEarlyCoopClose + // callback into the chain watcher that records each invocation onto + // the harness's earlyCoopCloseSummaries slice for assertions. + captureEarlyCoopClose bool } // withRequiredConfs sets the number of confirmations required for channel @@ -146,6 +160,15 @@ func withRequiredConfs(confs uint32) harnessOpt { } } +// withEarlyCoopCloseCapture enables recording of every invocation of the +// chain watcher's notifyEarlyCoopClose callback on the harness so tests can +// assert when, how often, and with which summary the early-dispatch fires. +func withEarlyCoopCloseCapture() harnessOpt { + return func(cfg *harnessConfig) { + cfg.captureEarlyCoopClose = true + } +} + // newChainWatcherTestHarness creates a new test harness for chain watcher // tests. func newChainWatcherTestHarness(t *testing.T, @@ -194,13 +217,38 @@ func newChainWatcherTestHarnessFromReporter(t *testing.T, spendRegistered: make(chan struct{}, 10), } + harness := &chainWatcherTestHarness{ + t: reporter, + aliceChannel: aliceChannel, + bobChannel: bobChannel, + notifier: notifier, + currentHeight: 100, + blockbeatProcessed: make(chan struct{}), + } + + // If the test wants to observe early-dispatch invocations, install + // a callback that records each summary onto the harness. + var notifyEarlyCoopClose func(*channeldb.ChannelCloseSummary) + if cfg.captureEarlyCoopClose { + notifyEarlyCoopClose = func( + s *channeldb.ChannelCloseSummary) { + + harness.earlyCoopCloseMu.Lock() + harness.earlyCoopCloseSummaries = append( + harness.earlyCoopCloseSummaries, s, + ) + harness.earlyCoopCloseMu.Unlock() + } + } + // Create chain watcher. chainWatcher, err := newChainWatcher(chainWatcherConfig{ - chanState: aliceChannel.State(), - notifier: notifier, - signer: aliceChannel.Signer, - extractStateNumHint: lnwallet.GetStateNumHint, - chanCloseConfs: cfg.requiredConfs, + chanState: aliceChannel.State(), + notifier: notifier, + signer: aliceChannel.Signer, + extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: cfg.requiredConfs, + notifyEarlyCoopClose: notifyEarlyCoopClose, contractBreach: func( retInfo *lnwallet.BreachRetribution, ) error { @@ -222,16 +270,8 @@ func newChainWatcherTestHarnessFromReporter(t *testing.T, // Subscribe to channel events. chanEvents := chainWatcher.SubscribeChannelEvents() - harness := &chainWatcherTestHarness{ - t: reporter, - aliceChannel: aliceChannel, - bobChannel: bobChannel, - chainWatcher: chainWatcher, - notifier: notifier, - chanEvents: chanEvents, - currentHeight: 100, - blockbeatProcessed: make(chan struct{}), - } + harness.chainWatcher = chainWatcher + harness.chanEvents = chanEvents // Wait for the initial spend registration that happens in Start(). harness.waitForSpendRegistration() @@ -407,6 +447,56 @@ func (h *chainWatcherTestHarness) mineBlocks(n int32) { h.currentHeight += n } +// earlyCoopCloseCount returns how many times the early-dispatch callback has +// fired since the harness started. +func (h *chainWatcherTestHarness) earlyCoopCloseCount() int { + h.earlyCoopCloseMu.Lock() + defer h.earlyCoopCloseMu.Unlock() + + return len(h.earlyCoopCloseSummaries) +} + +// earlyCoopCloseAt returns the early-dispatch summary recorded at the given +// index. The harness fails the test if the index is out of range. +func (h *chainWatcherTestHarness) earlyCoopCloseAt( + idx int) *channeldb.ChannelCloseSummary { + + h.earlyCoopCloseMu.Lock() + defer h.earlyCoopCloseMu.Unlock() + + if idx >= len(h.earlyCoopCloseSummaries) { + h.t.Fatalf("expected early-dispatch index %d, only %d "+ + "summaries recorded", idx, + len(h.earlyCoopCloseSummaries)) + } + + return h.earlyCoopCloseSummaries[idx] +} + +// waitForEarlyCoopClose blocks until at least the supplied count of +// early-dispatch invocations have been recorded, or the timeout elapses. +func (h *chainWatcherTestHarness) waitForEarlyCoopClose(want int, + timeout time.Duration) { + + h.t.Helper() + + deadline := time.Now().Add(timeout) + for { + if h.earlyCoopCloseCount() >= want { + return + } + if time.Now().After(deadline) { + h.t.Fatalf("expected %d early-dispatch invocations, "+ + "got %d after %v", want, + h.earlyCoopCloseCount(), timeout) + + return + } + + time.Sleep(10 * time.Millisecond) + } +} + // waitForCoopClose waits for a cooperative close event and returns it. func (h *chainWatcherTestHarness) waitForCoopClose( timeout time.Duration) *CooperativeCloseInfo { diff --git a/itest/list_on_test.go b/itest/list_on_test.go index c8e4244e7e3..5b9a7521fbd 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -463,6 +463,10 @@ var allTestCases = []*lntest.TestCase{ Name: "zero conf channel open", TestFunc: testZeroConfChannelOpen, }, + { + Name: "zero conf coop close subscribe events", + TestFunc: testZeroConfCoopCloseSubscribeEvents, + }, { Name: "option scid alias", TestFunc: testOptionScidAlias, diff --git a/itest/lnd_zero_conf_close_event_test.go b/itest/lnd_zero_conf_close_event_test.go new file mode 100644 index 00000000000..fa824b8d31f --- /dev/null +++ b/itest/lnd_zero_conf_close_event_test.go @@ -0,0 +1,243 @@ +package itest + +import ( + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lntest/rpc" + "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/stretchr/testify/require" +) + +// testZeroConfCoopCloseSubscribeEvents exercises the regression that was +// reported when production builds switched to a multi-confirmation +// reorg-aware close dispatch: SubscribeChannelEvents stopped emitting +// CLOSED_CHANNEL on cooperative closes for zero-conf channels until the +// close had reached the full confirmation depth, instead of firing at first +// detection like v0.20.1. +// +// The fix wires an early-dispatch callback into the chain watcher that fires +// a preliminary CLOSED_CHANNEL event over the channel notifier as soon as a +// coop close spend lands on chain. This test asserts: +// +// 1. CLOSED_CHANNEL fires on the SubscribeChannelEvents stream after only +// the first confirmation of the close tx (not after the full N=3 +// depth required by --dev.force-channel-close-confs=3). +// 2. FULLY_RESOLVED_CHANNEL fires once the close has reached N confs and +// the channel arbitrator has finished its resolution flow. +// 3. Exactly one CLOSED_CHANNEL is delivered — the suppression logic in +// the channel arbitrator drops the duplicate that would otherwise fire +// from MarkChannelClosed at N confs. +func testZeroConfCoopCloseSubscribeEvents(ht *lntest.HarnessTest) { + // Force coop close to require 3 confs so we exercise the async path + // in the chain watcher (the same path production hits via + // CloseConfsForCapacity). + const requiredConfs = 3 + + // Zero-conf channels need option-scid-alias and anchors; force-confs + // is what flips us out of the numConfs==1 fast-path so we can verify + // the early dispatch is what surfaces the CLOSED_CHANNEL event. + nodeArgs := []string{ + "--protocol.option-scid-alias", + "--protocol.zero-conf", + "--protocol.anchors", + "--dev.force-channel-close-confs=3", + } + + alice := ht.NewNode("Alice", nodeArgs) + bob := ht.NewNode("Bob", nodeArgs) + + ht.FundCoins(btcutil.SatoshiPerBitcoin, alice) + ht.EnsureConnected(alice, bob) + + // A channel acceptor on Bob is needed to allow the zero-conf + // negotiation to succeed. + acceptStream, cancelAcceptor := bob.RPC.ChannelAcceptor() + go acceptChannel(ht.T, true, acceptStream) + + const chanAmt = btcutil.Amount(1_000_000) + openParams := lntest.OpenChannelParams{ + Amt: chanAmt, + Private: true, + CommitmentType: lnrpc.CommitmentType_ANCHORS, + ZeroConf: true, + } + stream := ht.OpenChannelAssertStream(alice, bob, openParams) + cancelAcceptor() + + // Wait for the channel-open update — for zero-conf this arrives + // without any blocks needing to be mined. + chanPoint := ht.WaitForChannelOpenEvent(stream) + ht.AssertChannelInGraph(alice, chanPoint) + ht.AssertChannelInGraph(bob, chanPoint) + + // Subscribe Alice to channel events BEFORE we initiate the close so + // we capture the full close lifecycle on the wire. + chanSub := alice.RPC.SubscribeChannelEvents() + + // Alice initiates the cooperative close; NoWait so the closing tx + // just lands in the mempool. + closeStream, _ := ht.CloseChannelAssertPending(alice, chanPoint, false) + + // Mine a single block so the close tx confirms once. With the fix in + // place, the chain watcher's processDetectedSpend should + // insta-dispatch CLOSED_CHANNEL over the notifier as soon as the + // spend lands, even though the async path is still waiting on two + // more confs before driving MarkChannelClosed. + ht.MineBlocksAndAssertNumTxes(1, 1) + + closedSeen := waitForChannelEventOfType( + ht, chanSub, + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL, + ) + require.NotNil( + ht, closedSeen, + "CLOSED_CHANNEL must fire after the first conf of the "+ + "close tx (regression: production was waiting for "+ + "the full 3-conf depth)", + ) + + // The CLOSED_CHANNEL summary must reflect a cooperative close + // initiated by the local node. + closedSummary := closedSeen.GetClosedChannel() + require.NotNil(ht, closedSummary, + "CLOSED_CHANNEL update must carry a close summary") + require.Equal(ht, + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE, + closedSummary.CloseType, + ) + require.Equal(ht, + lnrpc.Initiator_INITIATOR_LOCAL, + closedSummary.CloseInitiator, + ) + + // Mine the remaining confs needed to take the close to its final + // resolution state. + ht.MineBlocksAndAssertNumTxes(requiredConfs-1, 0) + + // Drain the close-channel client stream so the test cleanup path + // doesn't hang on it. + go func() { + for { + if _, err := closeStream.Recv(); err != nil { + return + } + } + }() + + // FULLY_RESOLVED_CHANNEL must arrive after the close advances + // through the channel arbitrator at full N confs. + resolvedSeen := waitForChannelEventOfType( + ht, chanSub, + lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL, + ) + require.NotNil(ht, resolvedSeen, + "FULLY_RESOLVED_CHANNEL must fire after the close reaches "+ + "the required confirmation depth") + + // Crucially: no second CLOSED_CHANNEL event should have arrived + // between the early one and the FULLY_RESOLVED_CHANNEL. The + // channel-arbitrator-side suppression drops the duplicate that + // MarkChannelClosed would otherwise emit. We give the stream a brief + // quiet window to surface any straggling events. + assertNoMoreClosedEvents(ht, chanSub, 500*time.Millisecond) +} + +// waitForChannelEventOfType drains the channel events subscription until +// one of the supplied type lands or the harness's default timeout elapses. +// Other event types (PENDING_OPEN, OPEN, ACTIVE, INACTIVE, CHANNEL_UPDATE) +// are expected during a normal close flow and must be tolerated rather than +// fail the test. +func waitForChannelEventOfType(ht *lntest.HarnessTest, + sub rpc.ChannelEventsClient, + want lnrpc.ChannelEventUpdate_UpdateType) *lnrpc.ChannelEventUpdate { + + type result struct { + event *lnrpc.ChannelEventUpdate + err error + } + + results := make(chan result, 1) + deadline := time.After(wait.DefaultTimeout) + + go func() { + for { + ev, err := sub.Recv() + if err != nil { + results <- result{err: err} + return + } + if ev.Type == want { + results <- result{event: ev} + return + } + } + }() + + select { + case r := <-results: + require.NoErrorf(ht, r.err, + "error from channel event stream while waiting "+ + "for %v", want) + + return r.event + + case <-deadline: + ht.Fatalf("timed out waiting for channel event %v", want) + return nil + } +} + +// assertNoMoreClosedEvents reads from the subscription for the supplied +// quiet window and fails the test if a CLOSED_CHANNEL event is observed. +// FULLY_RESOLVED_CHANNEL has already been observed by this point so a +// second CLOSED_CHANNEL would represent the duplicate-notify regression +// that the suppression logic is meant to prevent. +func assertNoMoreClosedEvents(ht *lntest.HarnessTest, + sub rpc.ChannelEventsClient, window time.Duration) { + + done := make(chan struct{}) + defer close(done) + + errs := make(chan error, 1) + dups := make(chan *lnrpc.ChannelEventUpdate, 1) + + go func() { + for { + ev, err := sub.Recv() + if err != nil { + select { + case errs <- err: + case <-done: + } + return + } + if ev.Type == + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL { + + select { + case dups <- ev: + case <-done: + } + return + } + } + }() + + select { + case dup := <-dups: + ht.Fatalf("unexpected duplicate CLOSED_CHANNEL event: %v", + dup) + + case err := <-errs: + // Stream EOF or context cancel is fine; we just want the + // quiet window to elapse without a duplicate. + _ = err + + case <-time.After(window): + // Quiet window elapsed without a duplicate. Pass. + } +} + diff --git a/server.go b/server.go index 83131c6f2b3..e5307b2416a 100644 --- a/server.go +++ b/server.go @@ -1404,6 +1404,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, Sweeper: s.sweeper, Registry: s.invoices, NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent, + NotifyEarlyClosedChannel: s.channelNotifier.NotifyEarlyClosedChannelEvent, NotifyFullyResolvedChannel: s.channelNotifier.NotifyFullyResolvedChannelEvent, OnionProcessor: s.sphinxPayment, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,