Skip to content
7 changes: 5 additions & 2 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,10 +1489,13 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, witnes
"headerDifficulty", header.Difficulty,
)
}
// Block on send (or exit on stop). A default branch here would
// drop the result silently when results is full, leaking the
// miner's pendingTasks entry.
select {
case results <- &consensus.NewSealedBlockEvent{Block: block.WithSeal(header), Witness: witness}:
default:
log.Warn("Sealing result was not read by miner", "number", number, "sealhash", SealHash(header, c.config))
case <-stop:
log.Info("Seal interrupted before result delivery", "number", number, "sealhash", SealHash(header, c.config))
}
}()

Expand Down
81 changes: 81 additions & 0 deletions consensus/bor/bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,87 @@ func TestSeal_AuthorizedSigner(t *testing.T) {
}
}

// TestSeal_BlocksOnFullResultChannelInsteadOfSilentDrop is a regression test
// for the silent-drop in Bor.Seal's result-delivery goroutine.
//
// Bug: the goroutine's second select used `default` as the not-sent path:
//
// select {
// case results <- &consensus.NewSealedBlockEvent{...}:
// default:
// log.Warn("Sealing result was not read by miner", ...)
// }
//
// When the result channel had no ready receiver (e.g., resultLoop blocked on
// a slow chain.WriteBlockAndSetHead under elephant-contract load), `default`
// fired immediately and the result was discarded without posting. The
// worker's pendingTasks entry for this sealhash would then leak: resultLoop
// never received it, never deleted it, and the producer's veblop fallback
// would short-circuit on hasPendingTasks > 0 every tick afterwards. This is
// post-mortem candidate 2 for the 2026-05-07 Amoy val4 stall.
//
// Fix: replace `default` with `case <-stop` so the goroutine either delivers
// or exits cleanly on interrupt. The taskLoop's interrupt() (with its
// pendingTasks-delete companion fix) is then the single place that cleans
// up on the stop path.
//
// Test scheme: drive Seal with a zero-buffer results channel and NO reader
// initially. With the bug, the goroutine immediately drops via `default`
// and a subsequent receive times out. With the fix, the goroutine blocks
// on send until our receive arrives, and we get the result back.
func TestSeal_BlocksOnFullResultChannelInsteadOfSilentDrop(t *testing.T) {
t.Parallel()
setup := newSignedChainSetup(t)
b := setup.bor
b.fakeDiff = true

b.Authorize(setup.signerAddr, func(account accounts.Account, mimeType string, data []byte) ([]byte, error) {
return crypto.Sign(crypto.Keccak256(data), setup.privKey)
})

h := setup.makeSignedHeader(t, 1, setup.genesis)
// Use a Time slightly in the future so the goroutine exercises a real
// time.After(delay) wait. We need a non-trivial delay so we can ensure
// the goroutine reaches the second select *before* our receive — that
// is the precise condition under which the buggy `default` branch
// would fire (no receiver ready, no buffer slot, no fallback).
const delaySec = 1
h.Time = uint64(time.Now().Unix()) + delaySec

body := &types.Body{Transactions: types.Transactions{types.NewTx(&types.LegacyTx{})}}
block := types.NewBlock(h, body, nil, trie.NewStackTrie(nil))

// Zero-buffer channel + no receiver-ready at the moment the goroutine
// reaches the second select. Pre-fix: `default` fires → silent drop.
// Post-fix: send blocks until either a receiver pairs with it or stop
// closes — neither happens here, so the goroutine remains parked on
// send and our delayed receive pairs with it.
results := make(chan *consensus.NewSealedBlockEvent)
stop := make(chan struct{})

err := b.Seal(setup.chain.HeaderChain(), block, nil, results, stop)
require.NoError(t, err, "Seal should return nil and spawn the sealing goroutine")

// Wait long enough for the goroutine's time.After(delay) to fire AND
// for it to advance into the second select. This is the critical
// ordering: with the bug, by the time we start receiving below the
// goroutine has already taken the silent `default` path. With the fix
// the goroutine is parked on the send waiting for any receiver.
time.Sleep(time.Duration(delaySec)*time.Second + 500*time.Millisecond)

select {
case ev := <-results:
require.NotNil(t, ev, "expected a sealed block event")
require.NotNil(t, ev.Block, "expected ev.Block to be non-nil")
require.Equal(t, block.NumberU64(), ev.Block.NumberU64(),
"sealed block number should match the input block")
case <-time.After(2 * time.Second):
t.Fatal("Bor.Seal silently dropped the result via the second-select default branch; " +
"expected the goroutine to remain blocked on send (or exit on <-stop) instead. " +
"This was the leak path for val4-style \"elected but silent\" stalls under load.")
}
}

func TestSeal_UnauthorizedSigner(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 4 additions & 2 deletions miner/fake_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ func (m *mockBackendBor) BlockChain() *core.BlockChain {
return m.bc
}

// PeerCount implements Backend.
// PeerCount implements Backend. Returns 1 so the worker's mainLoop sees the
// node as peered — tests using mockBackendBor are not exercising the
// PeerCount==0 gate.
func (*mockBackendBor) PeerCount() int {
panic("unimplemented")
return 1
}

func (m *mockBackendBor) TxPool() *txpool.TxPool {
Expand Down
5 changes: 3 additions & 2 deletions miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (m *mockBackend) BlockChain() *core.BlockChain {
return m.bc
}

// PeerCount implements Backend.
// PeerCount implements Backend. Returns 1 so the worker's mainLoop sees the
// node as peered — miner_test.go is not exercising the PeerCount==0 gate.
func (*mockBackend) PeerCount() int {
panic("unimplemented")
return 1
}

func (m *mockBackend) TxPool() *txpool.TxPool {
Expand Down
89 changes: 77 additions & 12 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,16 @@
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()

_, isBor := w.engine.(*bor.Bor)

var (
interrupt *atomic.Int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of sealing.
// Stall-detection state for veblopTimer: tracks the last time we
// emitted a stall warning so the log isn't flooded while the
// producer is stuck.
lastStallWarnAt time.Time
)

timer := time.NewTimer(0)
Expand Down Expand Up @@ -743,7 +749,10 @@
veblopTimeout = w.blockTime
}

if w.chainConfig.Bor == nil || !w.chainConfig.Bor.IsRio(currentBlock.Number) {
// Veblop fallback fires for any Bor chain — pre-Rio needs the
// retry to recover after a transient peer outage on real
// network nodes, since startCh fires only once on startup.
if !isBor || w.chainConfig.Bor == nil {
veblopTimer.Reset(veblopTimeout)
continue
}
Expand All @@ -753,6 +762,7 @@
w.pendingMu.RUnlock()

pendingWorkBlock := w.pendingWorkBlock.Load()
lastStallWarnAt = w.warnIfStalled(currentBlock, time.Now().Unix()-int64(currentBlock.Time), veblopTimeout, pendingWorkBlock, hasPendingTasks, lastStallWarnAt)
if pendingWorkBlock == currentBlock.Number.Uint64()+1 {
// Next block is already being worked on, reset the timer.
veblopTimer.Reset(veblopTimeout)
Expand Down Expand Up @@ -829,6 +839,11 @@

bor, isBor := w.engine.(*bor.Bor)
devFakeAuthor := isBor && bor != nil && bor.DevFakeAuthor
// "real-network node" = Bor engine wired to a live heimdall. Test /
// dev setups (--bor.withoutheimdall, Clique, Ethash) leave the
// HeimdallClient nil and bypass the PeerCount gate so single-node
// and intentional-disconnection tests keep producing.
realNetworkNode := isBor && bor != nil && bor.HeimdallClient != nil
for {
select {
case req := <-w.newWorkCh:
Expand All @@ -840,11 +855,17 @@
continue
}

if w.chainConfig.ChainID.Cmp(params.BorMainnetChainConfig.ChainID) == 0 || w.chainConfig.ChainID.Cmp(params.MumbaiChainConfig.ChainID) == 0 || w.chainConfig.ChainID.Cmp(params.AmoyChainConfig.ChainID) == 0 {
if w.eth.PeerCount() > 0 || devFakeAuthor {
//nolint:contextcheck
w.commitWork(req.interrupt, req.noempty, req.timestamp)
}
// PeerCount gate applies only to real-network Bor nodes
// (heimdall configured). Test / dev setups
// (--bor.withoutheimdall, Clique, Ethash, etc.) commit
// unconditionally — single-node and intentional-disconnection
// tests rely on producing without peers.
if realNetworkNode && w.eth.PeerCount() == 0 && !devFakeAuthor {
// Drop the request and unblock the veblop fallback retry.
// In steady state peers > 0, so this firing means we
// tried to commit during a peer outage — worth surfacing.
w.pendingWorkBlock.Store(0)
log.Warn("Dropped newWorkReq: no peers", "head", w.chain.CurrentBlock().Number.Uint64())
} else {
//nolint:contextcheck
w.commitWork(req.interrupt, req.noempty, req.timestamp)
Expand Down Expand Up @@ -948,12 +969,16 @@
prev common.Hash
)

// interrupt aborts the in-flight sealing task.
// interrupt aborts the in-flight sealing task and clears its leaked
// pendingTasks entry (see deletePendingTask).
interrupt := func() {
if stopCh != nil {
close(stopCh)
stopCh = nil
}
if w.deletePendingTask(prev) {
log.Warn("Cleaned leaked pendingTasks entry on interrupt", "sealhash", prev)
}
}
Comment thread
claude[bot] marked this conversation as resolved.

for {
Expand Down Expand Up @@ -1933,16 +1958,17 @@
// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int64) {
// Must be declared before any early return so pendingWorkBlock is
// always cleared — otherwise the veblop fallback would short-circuit.
defer func() {
w.pendingWorkBlock.Store(0)
}()

// Abort committing if node is still syncing
if w.syncing.Load() {
return
}

// Clear the pending work block number when commitWork completes (success or failure).
defer func() {
w.pendingWorkBlock.Store(0)
}()

// Set the coinbase if the worker is running or it's required
var coinbase common.Address
if w.IsRunning() {
Expand Down Expand Up @@ -2377,6 +2403,45 @@
w.pendingMu.Unlock()
}

// warnIfStalled emits a single WARN per 30s when the chain has been stale
// for >3x the block time AND the veblop fallback can't make progress
// (either pendingWorkBlock thinks work is in flight, or pendingTasks is
// non-empty). Returns the new last-warn timestamp.
func (w *worker) warnIfStalled(currentBlock *types.Header, chainAgeSec int64, veblopTimeout time.Duration, pendingWorkBlock uint64, hasPendingTasks bool, lastWarnAt time.Time) time.Time {
if chainAgeSec <= 3*int64(veblopTimeout.Seconds()) {
return lastWarnAt
}
if pendingWorkBlock != currentBlock.Number.Uint64()+1 && !hasPendingTasks {
return lastWarnAt
}
if time.Since(lastWarnAt) <= 30*time.Second {
return lastWarnAt
}
log.Warn("Possible producer stall: veblop fallback skipping while chain is stale",
"currentBlock", currentBlock.Number.Uint64(),
"chainAgeSec", chainAgeSec,
"veblopTimeoutSec", int64(veblopTimeout.Seconds()),
"pendingWorkBlock", pendingWorkBlock,
"pendingTasksCount", len(w.pendingTasks),
"peerCount", w.eth.PeerCount())

Check failure on line 2426 in miner/worker.go

View check run for this annotation

Claude / Claude Code Review

Data race: warnIfStalled reads len(w.pendingTasks) without holding pendingMu

Data race: `warnIfStalled` at miner/worker.go:2425 reads `len(w.pendingTasks)` without holding `w.pendingMu`. The caller `newWorkLoop` takes `RLock` for `hasPendingTasks` but releases it before this call, while `taskLoop`/`resultLoop`/`clearPending`/`deletePendingTask` all write to the map under `pendingMu.Lock` — `go test -race` will flag this. Fix: compute the count under the same `RLock` that already protects `hasPendingTasks` and pass it as a parameter, mirroring how `hasPendingTasks` is alr
Comment thread
claude[bot] marked this conversation as resolved.
return time.Now()
}

// deletePendingTask removes a single pendingTasks entry by sealhash and
// returns true if the entry existed. The zero hash is a no-op. Used by
// taskLoop.interrupt to clean entries that resultLoop wouldn't reach
// because Bor.Seal's stop-branch returns silently.
func (w *worker) deletePendingTask(sealHash common.Hash) bool {
if sealHash == (common.Hash{}) {
return false
}
w.pendingMu.Lock()
defer w.pendingMu.Unlock()
_, existed := w.pendingTasks[sealHash]
delete(w.pendingTasks, sealHash)
return existed
}

// vmConfig returns the VM config.
func (w *worker) vmConfig() vm.Config {
cfg := *w.chain.GetVMConfig()
Expand Down
Loading
Loading