Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions channelnotifier/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) {
}
}

// NotifyEarlyClosedChannelEvent dispatches a ClosedChannelEvent built from the
// supplied close summary, without consulting the channel database. This is
// used by the chain watcher to insta-dispatch CLOSED_CHANNEL events to RPC
// subscribers as soon as a coop close is first detected on chain, before the
// async N-conf path has persisted the close in the database. The summary's
// IsPending field will typically be true at this point; callers should set it
// accordingly.
func (c *ChannelNotifier) NotifyEarlyClosedChannelEvent(
summary *channeldb.ChannelCloseSummary) {

event := ClosedChannelEvent{CloseSummary: summary}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send early closed channel update: %v",
err)
}
}

// NotifyFullyResolvedChannelEvent notifies the channelEventNotifier goroutine
// that a channel was fully resolved on chain.
func (c *ChannelNotifier) NotifyFullyResolvedChannelEvent(
Expand Down
89 changes: 89 additions & 0 deletions channelnotifier/channelnotifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"testing"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -41,3 +43,90 @@
t.Fatalf("expected to receive channel update event")
}
}

// TestNotifyEarlyClosedChannelEvent verifies that the early-dispatch path
// delivers exactly the supplied close summary to subscribers without
// consulting the channel database. This is the path used by the chain watcher
// at first conf to insta-dispatch CLOSED_CHANNEL events for cooperative
// closes, before the close summary is persisted.
func TestNotifyEarlyClosedChannelEvent(t *testing.T) {
t.Parallel()

// Pass nil for chanDB; the early-dispatch path must not touch it.
ntfnServer := New(nil)
require.NoError(t, ntfnServer.Start())
t.Cleanup(func() {
require.NoError(t, ntfnServer.Stop())
})

sub, err := ntfnServer.SubscribeChannelEvents()
require.NoError(t, err)
t.Cleanup(sub.Cancel)

// Build a close summary with IsPending=true to mirror what the chain
// watcher will hand in at first-conf detection.
chanPoint := wire.OutPoint{
Hash: chainhash.Hash{0x01, 0x02, 0x03},
Index: 4,
}
summary := &channeldb.ChannelCloseSummary{
ChanPoint: chanPoint,
CloseType: channeldb.CooperativeClose,
IsPending: true,
}

ntfnServer.NotifyEarlyClosedChannelEvent(summary)

select {
case event := <-sub.Updates():
closedEvent, ok := event.(ClosedChannelEvent)
require.True(t, ok, "expected ClosedChannelEvent, got %T", event)

Check failure on line 83 in channelnotifier/channelnotifier_test.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 81 characters long, which exceeds the maximum of 80 characters. (ll)
require.NotNil(t, closedEvent.CloseSummary)
require.True(t, closedEvent.CloseSummary.IsPending,
"early dispatched summary must carry IsPending=true")
require.Equal(t, summary, closedEvent.CloseSummary,
"early dispatched summary must reach subscriber "+
"verbatim")

case <-time.After(time.Second):
t.Fatal("expected to receive early closed channel event")
}
}

// TestNotifyEarlyClosedChannelEventSingleEvent guards against accidental
// re-dispatch: a single early-notify call must produce exactly one event,
// not two (e.g. a fan-out bug between the early and the legacy paths).
func TestNotifyEarlyClosedChannelEventSingleEvent(t *testing.T) {
t.Parallel()

ntfnServer := New(nil)
require.NoError(t, ntfnServer.Start())
t.Cleanup(func() {
require.NoError(t, ntfnServer.Stop())
})

sub, err := ntfnServer.SubscribeChannelEvents()
require.NoError(t, err)
t.Cleanup(sub.Cancel)

summary := &channeldb.ChannelCloseSummary{
ChanPoint: wire.OutPoint{Index: 7},
CloseType: channeldb.CooperativeClose,
IsPending: true,
}
ntfnServer.NotifyEarlyClosedChannelEvent(summary)

// Drain the single expected event.
select {
case <-sub.Updates():
case <-time.After(time.Second):
t.Fatal("expected to receive early closed channel event")
}

// Any further read should not produce another event.
select {
case extra := <-sub.Updates():
t.Fatalf("unexpected second event: %T", extra)
case <-time.After(50 * time.Millisecond):
}
}
55 changes: 39 additions & 16 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@
// will use to notify the ChannelNotifier about a newly closed channel.
NotifyClosedChannel func(wire.OutPoint)

// NotifyEarlyClosedChannel is invoked by the chain watcher when a
// cooperative close spend is first detected on chain, before the close
// summary has been persisted to the closed-channel bucket. It allows
// the channel notifier to dispatch a CLOSED_CHANNEL event over RPC at
// the same depth it did before the multi-confirmation reorg-aware
// dispatch was introduced. The follow-up persist + state advance still
// waits for the full required confirmation count.
NotifyEarlyClosedChannel func(*channeldb.ChannelCloseSummary)

// NotifyFullyResolvedChannel is a function closure that the
// ChainArbitrator will use to notify the ChannelNotifier about a newly
// resolved channel. The main difference to NotifyClosedChannel is that
Expand Down Expand Up @@ -451,7 +460,19 @@
if err != nil {
return err
}
c.cfg.NotifyClosedChannel(summary.ChanPoint)

// For cooperative closes the chain watcher already
// fired a preliminary CLOSED_CHANNEL event over the
// channel notifier when the spend was first detected
// on chain. Suppressing the notify here keeps the
// SubscribeChannelEvents stream emitting a single
// CLOSED_CHANNEL per close, matching the v0.20.1
// surface. Force/breach closes still notify here
// since they don't take the early-dispatch path.
if summary.CloseType != channeldb.CooperativeClose {
c.cfg.NotifyClosedChannel(summary.ChanPoint)
}

return nil
},
IsPendingClose: false,
Expand Down Expand Up @@ -1145,11 +1166,12 @@
chanPoint, retInfo,
)
},
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
auxCloser: c.cfg.AuxCloser,
chanCloseConfs: c.cfg.ChannelCloseConfs,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
auxCloser: c.cfg.AuxCloser,
chanCloseConfs: c.cfg.ChannelCloseConfs,
notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel,
},
)
if err != nil {
Expand Down Expand Up @@ -1319,16 +1341,17 @@

chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
auxCloser: c.cfg.AuxCloser,
chanCloseConfs: c.cfg.ChannelCloseConfs,
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
auxCloser: c.cfg.AuxCloser,
chanCloseConfs: c.cfg.ChannelCloseConfs,
notifyEarlyCoopClose: c.cfg.NotifyEarlyClosedChannel,

Check failure on line 1354 in contractcourt/chain_arbitrator.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 85 characters long, which exceeds the maximum of 80 characters. (ll)
},
)
if err != nil {
Expand Down
Loading
Loading