Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
notificationCancels chan interface{}
notificationRegistry chan interface{}

historicalPkScriptDispatches chan *chainntnfs.HistoricalPkScriptDispatch
historicalPkScriptDispatchSlots chan struct{}

txNotifier *chainntnfs.TxNotifier

blockEpochClients map[uint64]*blockEpochRegistration
Expand Down Expand Up @@ -99,6 +102,14 @@

notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}),
historicalPkScriptDispatches: make(
chan *chainntnfs.HistoricalPkScriptDispatch,
chainntnfs.MaxPkScriptHistoricalDispatchQueueSize,
),
historicalPkScriptDispatchSlots: make(
chan struct{},
chainntnfs.MaxPkScriptHistoricalDispatchQueueSize,
),

blockEpochClients: make(map[uint64]*blockEpochRegistration),

Expand Down Expand Up @@ -206,6 +217,9 @@
b.wg.Add(1)
go b.notificationDispatcher()

b.wg.Add(1)
go b.historicalPkScriptDispatcher()

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&b.active, 1)
Expand Down Expand Up @@ -351,6 +365,16 @@
}
}(msg)

case *chainntnfs.HistoricalPkScriptDispatch:
err := b.queueHistoricalPkScriptDispatch(msg)
if err != nil {
chainntnfs.Log.Errorf("Unable to queue "+
"historical pkScript dispatch for "+
"sub=%d within range %d-%d: %v",
msg.SubscriptionID, msg.StartHeight,
msg.EndHeight, err)
}

case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription")

Expand Down Expand Up @@ -917,6 +941,128 @@
return nil, nil
}

// queueHistoricalPkScriptDispatch reserves capacity and queues a historical
// pkScript scan for the bitcoind backend.
func (b *BitcoindNotifier) queueHistoricalPkScriptDispatch(
msg *chainntnfs.HistoricalPkScriptDispatch) error {

if msg == nil {
return nil
}

err := b.reserveHistoricalPkScriptDispatchSlot()
if err != nil {
return err
}

return b.queueReservedHistoricalPkScriptDispatch(msg)
}

// reserveHistoricalPkScriptDispatchSlot reserves one pending historical pkScript
// scan slot.
func (b *BitcoindNotifier) reserveHistoricalPkScriptDispatchSlot() error {
select {
case <-b.quit:
return chainntnfs.ErrChainNotifierShuttingDown
default:
}

select {
case b.historicalPkScriptDispatchSlots <- struct{}{}:
return nil

case <-b.quit:
return chainntnfs.ErrChainNotifierShuttingDown

default:
return fmt.Errorf("%w: pending scans %d exceeds limit %d",
chainntnfs.ErrTooManyHistoricalPkScriptScans,
len(b.historicalPkScriptDispatchSlots),
chainntnfs.MaxPkScriptHistoricalDispatchQueueSize)
}
}

// releaseHistoricalPkScriptDispatchSlot releases one pending historical pkScript
// scan slot.
func (b *BitcoindNotifier) releaseHistoricalPkScriptDispatchSlot() {
select {
case <-b.historicalPkScriptDispatchSlots:
default:
}
}

// queueReservedHistoricalPkScriptDispatch queues a dispatch after capacity has
// already been reserved.
func (b *BitcoindNotifier) queueReservedHistoricalPkScriptDispatch(
msg *chainntnfs.HistoricalPkScriptDispatch) error {

select {
case b.historicalPkScriptDispatches <- msg:
return nil

case <-b.quit:
b.releaseHistoricalPkScriptDispatchSlot()

return chainntnfs.ErrChainNotifierShuttingDown
}
}

// historicalPkScriptDispatcher serially executes queued historical pkScript
// scans.
func (b *BitcoindNotifier) historicalPkScriptDispatcher() {
defer b.wg.Done()

for {
select {
case msg := <-b.historicalPkScriptDispatches:
err := b.historicalPkScriptDispatch(msg)
b.releaseHistoricalPkScriptDispatchSlot()
if err != nil {
chainntnfs.Log.Errorf("Historical pkScript dispatch "+

Check failure on line 1021 in chainntnfs/bitcoindnotify/bitcoind.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 86 characters long, which exceeds the maximum of 80 characters. (ll)
"for sub=%d within range %d-%d failed: %v",
msg.SubscriptionID, msg.StartHeight,
msg.EndHeight, err)
}

case <-b.quit:
return
}
}
}
Comment on lines +944 to +1031
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's significant code duplication for the historical pkScript dispatch logic across bitcoindnotify, btcdnotify, and neutrinonotify backends. The functions queueHistoricalPkScriptDispatch, reserveHistoricalPkScriptDispatchSlot, releaseHistoricalPkScriptDispatchSlot, queueReservedHistoricalPkScriptDispatch, and historicalPkScriptDispatcher are nearly identical in all three files.

Consider refactoring this logic into a shared helper struct that can be embedded by each notifier. This would improve maintainability and reduce redundancy.


// historicalPkScriptDispatch manually scans blocks for pkScript activity for a
// single subscription.
func (b *BitcoindNotifier) historicalPkScriptDispatch(
msg *chainntnfs.HistoricalPkScriptDispatch) error {

return b.txNotifier.SyncHistoricalPkScriptDispatch(
msg,
func(height uint32) error {
select {
case <-b.quit:
return chainntnfs.ErrChainNotifierShuttingDown
default:
}

blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return fmt.Errorf("unable to retrieve hash for block "+

Check failure on line 1049 in chainntnfs/bitcoindnotify/bitcoind.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 87 characters long, which exceeds the maximum of 80 characters. (ll)
"with height %d: %v", height, err)
}

block, err := b.GetBlock(blockHash)
if err != nil {
return fmt.Errorf("unable to retrieve block with hash "+

Check failure on line 1055 in chainntnfs/bitcoindnotify/bitcoind.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 88 characters long, which exceeds the maximum of 80 characters. (ll)
"%v: %v", blockHash, err)
}

return b.txNotifier.ProcessHistoricalPkScriptBlockWithDispatch(

Check failure on line 1059 in chainntnfs/bitcoindnotify/bitcoind.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 87 characters long, which exceeds the maximum of 80 characters. (ll)
msg, btcutil.NewBlock(block), height,
)
},
)
}

// RegisterConfirmationsNtfn registers an intent to be notified once the target
// txid/output script has reached numConfs confirmations on-chain. When
// intending to be notified of the confirmation of an output script, a nil txid
Expand Down Expand Up @@ -953,6 +1099,49 @@
}
}

// RegisterPkScriptNotifier creates a new pkScript notification stream.
func (b *BitcoindNotifier) RegisterPkScriptNotifier() (
*chainntnfs.PkScriptNotificationRegistration, error) {

reg, err := b.txNotifier.RegisterPkScriptNotifier()
if err != nil {
return nil, err
}

reg.Event.AddPkScripts = func(pkScripts [][]byte,
opts ...chainntnfs.NotifierOption) (*chainntnfs.PkScriptAddResult,
error) {

dispatch, _, addedScripts, err := reg.AddPkScripts(
pkScripts, opts...,
)
if err != nil {
return nil, err
}
result := chainntnfs.NewPkScriptAddResult(dispatch, addedScripts)

err = b.queueHistoricalPkScriptDispatch(dispatch)
if err != nil {
removeErr := reg.RemovePkScripts(addedScripts)
if removeErr != nil {
return nil, fmt.Errorf("unable to queue historical "+
"pkScript scan: %w, rollback failed: %v",
err, removeErr)
}

return nil, err
}

return result, nil
}

reg.Event.RemovePkScripts = func(pkScripts [][]byte) error {
return reg.RemovePkScripts(pkScripts)
}

return reg.Event, nil
}

// blockEpochRegistration represents a client's intent to receive a
// notification with each newly connected block.
type blockEpochRegistration struct {
Expand Down
Loading
Loading