Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d63daed
miner, core, consensus/bor: pipelined state root computation (PoC)
pratikspatil024 Mar 30, 2026
ceca519
miner: run speculative fillTransactions concurrently with SRC and rem…
pratikspatil024 Apr 1, 2026
3dcdf80
miner: async DB write, concurrent fill, and interrupt timer improvements
pratikspatil024 Apr 1, 2026
07345ad
llint fix
pratikspatil024 Apr 1, 2026
a86db50
addressed comments and fix test, lint
pratikspatil024 Apr 2, 2026
8d5ed1b
core/stateless: (fix unit test) fix NewWitness zeroing breaking witne…
pratikspatil024 Apr 2, 2026
0e2da86
core, consensus/bor, eth, triedb: pipelined state root computation fo…
pratikspatil024 Apr 9, 2026
b283227
tests/bor: add pipelined import SRC self-destruct integration test
pratikspatil024 Apr 10, 2026
acc0ef7
core/state, triedb/pathdb: fix prefetcher race during pipelined SRC
pratikspatil024 Apr 10, 2026
5d45f02
miner, consensus/bor, core, eth: harden pipelined SRC abort handling
pratikspatil024 Apr 16, 2026
46e9ea4
core, miner, core/state: added metrics for pipelined SRC
pratikspatil024 Apr 21, 2026
caac30e
core, core/state, eth, tests/bor, miner: refactor pipelined-src funct…
pratikspatil024 Apr 22, 2026
d2d6641
core, core/txpool, miner: PR review fixes + pipelined import hardening
pratikspatil024 Apr 23, 2026
946f137
core: added metrics for preloadFlatDiffReads in pipelined SRC
pratikspatil024 Apr 28, 2026
d00791d
core: added metrics - cheap exec and auto-collection phases for pipel…
pratikspatil024 Apr 28, 2026
68e80ac
core: stop execution prefetcher in pipelined import path
pratikspatil024 Apr 30, 2026
8ef3578
core, miner: honour producewitnesses in pipelined import SRC path
pratikspatil024 May 1, 2026
bbaa0d6
core: use multi-reader StateDB on pipelined SRC witness-off path
pratikspatil024 May 4, 2026
fcab1bc
core, core/stateless, miner: share execution witness with pipelined SRC
pratikspatil024 May 5, 2026
ecf6ebf
core, core/state, eth, internal/cli, miner: add warm-snapshot handoff…
pratikspatil024 May 5, 2026
d9e22cd
core/state: use warm snapshot for SRC commit trie opens
pratikspatil024 May 6, 2026
696c3c1
core, core/state: add import phase observability
pratikspatil024 May 6, 2026
1bf3f2b
core, core/state: split pipelined prefetch-stop observability
pratikspatil024 May 6, 2026
e68753e
core, core/state: build warm snapshot inside SRC goroutine
pratikspatil024 May 7, 2026
8a24f4e
core/state: make pipelined SRC prefetch stop snapshot-fast
pratikspatil024 May 7, 2026
665746d
core/state: bound snapshot-fast prefetch drain latency
pratikspatil024 May 7, 2026
1d304ca
core, core/state, miner: detach import prefetcher for SRC
pratikspatil024 May 9, 2026
b5f9c0e
consensus, miner: restore Bor early block announcement timing
pratikspatil024 May 13, 2026
4971d12
miner, cli: disable production pipelined SRC
pratikspatil024 May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 167 additions & 83 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
inmemorySignatures = 4096 // Number of recent block signatures to keep in memory
veblopBlockTimeout = time.Second * 8 // Timeout for new span check. DO NOT CHANGE THIS VALUE.
minBlockBuildTime = 1 * time.Second // Minimum remaining time before extending the block deadline to avoid empty blocks
// minBlockBuildTime is the minimum remaining time before Prepare() extends
// the block deadline to avoid producing empty blocks. If time.Until(target)
// is less than this value, the target timestamp is pushed forward by one
// blockTime period.
//
// Abort-recovery rebuilds from pipelined SRC are exempt from this push. By the
// time speculative execution is discarded, most of the slot may already be
// gone; moving the header to the next slot would create avoidable 3-second
// blocks on 2-second devnets.
minBlockBuildTime = 1 * time.Second
)

// Bor protocol constants.
Expand Down Expand Up @@ -1007,6 +1016,37 @@
}
}

func (c *Bor) parentActualTime(parent *types.Header, parentHash common.Hash) time.Time {
parentBlockTime := time.Unix(int64(parent.Time), 0)
parentActualBlockTime := parentBlockTime
if c.parentActualTimeCache != nil {
if v, ok := c.parentActualTimeCache.Get(parentHash); ok {
if at, ok := v.(time.Time); ok && at.After(parentBlockTime) {
parentActualBlockTime = at
}
}
}
return parentActualBlockTime
}

// EarliestAnnounceTime returns the earliest local time at which a prepared
// block can be announced without violating Bor's post-Giugliano future-block
// checks. Primary producers may announce before the block's own timestamp, but
// not before the parent slot boundary.
func (c *Bor) EarliestAnnounceTime(chain consensus.ChainHeaderReader, header *types.Header) time.Time {
if header == nil || header.Number == nil || header.Number.Sign() == 0 {
return time.Now()
}
if !c.config.IsGiugliano(header.Number) {
return header.GetActualTime()
}
parent := chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
if parent == nil {
return header.GetActualTime()
}
return c.parentActualTime(parent, header.ParentHash)
}

// Prepare implements consensus.Engine, preparing all the consensus fields of the
// header for running the transactions on top.
func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header, waitOnPrepare bool) error {
Expand Down Expand Up @@ -1117,17 +1157,7 @@
if c.blockTime > 0 && c.config.IsRio(header.Number) {
// Only enable custom block time for Rio and later

parentBlockTime := time.Unix(int64(parent.Time), 0)
// Default to parent block timestamp
parentActualBlockTime := parentBlockTime
// If we have the parent's ActualTime locally (by parent hash), prefer it
if c.parentActualTimeCache != nil {
if v, ok := c.parentActualTimeCache.Get(header.ParentHash); ok {
if at, ok := v.(time.Time); ok && at.After(parentBlockTime) {
parentActualBlockTime = at
}
}
}
parentActualBlockTime := c.parentActualTime(parent, header.ParentHash)
actualNewBlockTime := parentActualBlockTime.Add(c.blockTime)
header.Time = uint64(actualNewBlockTime.Unix())
header.ActualTime = actualNewBlockTime
Expand All @@ -1145,24 +1175,34 @@
// Ensure minimum build time so the block has enough time to include transactions.
// The interrupt timer reserves 500ms for state root computation, so without
// sufficient remaining time the block would end up empty.
if time.Until(header.GetActualTime()) < minBlockBuildTime {
//
// Abort-recovery rebuilds are different: speculative execution has already
// spent most of the slot, so pushing them again would create an avoidable
// extra block-time gap. Those late rebuilds should keep their original slot.
if !header.AbortRecovery && time.Until(header.GetActualTime()) < minBlockBuildTime {
header.Time = uint64(now.Add(blockTime).Unix())
if c.blockTime > 0 && c.config.IsRio(header.Number) {
header.ActualTime = now.Add(blockTime)
}
}

// Wait before start the block production if needed (previously this wait was on Seal)
// Giugliano introduced early block announcements: primary producers wait
// until the parent slot boundary before building, then Seal can return
// immediately and announce the block before its own timestamp. Speculative
// and prefetch callers pass waitOnPrepare=false because they intentionally
// build ahead and perform their own parent-boundary wait before sealing.
if c.config.IsGiugliano(header.Number) && waitOnPrepare {
var successionNumber int
// if signer is not empty (RPC nodes have empty signer)
if currentSigner.signer != (common.Address{}) {
var err error
successionNumber, err = snap.GetSignerSuccessionNumber(currentSigner.signer)
if err != nil {
return err
}
if successionNumber == 0 {
// Avoid allocating a timer when the parent boundary has already
// passed. This is equivalent to develop's immediate time.After path
// for non-positive delays, just cheaper and more explicit.
if successionNumber == 0 && delay > 0 {
<-time.After(delay)
}
}
Expand Down Expand Up @@ -1193,7 +1233,7 @@
// check and commit span
if !c.config.IsRio(header.Number) {
if err := c.checkAndCommitSpan(wrappedState, header, cx); err != nil {
log.Error("Error while committing span", "error", err)

Check failure on line 1236 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error while committing span" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ4g46135j4Bp7B3gmWL&open=AZ4g46135j4Bp7B3gmWL&pullRequest=2180
return nil
}
}
Expand All @@ -1202,7 +1242,7 @@
// commit states
stateSyncData, err = c.CommitStates(wrappedState, header, cx)
if err != nil {
log.Error("Error while committing states", "error", err)

Check failure on line 1245 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error while committing states" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ4g46135j4Bp7B3gmWN&open=AZ4g46135j4Bp7B3gmWN&pullRequest=2180
return nil
}
}
Expand All @@ -1215,7 +1255,7 @@
// the wrapped state here as it may have a hooked state db instance which can help
// in tracing if it's enabled.
if err = c.changeContractCodeIfNeeded(headerNumber, wrappedState); err != nil {
log.Error("Error changing contract code", "error", err)

Check failure on line 1258 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error changing contract code" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ4g46135j4Bp7B3gmWM&open=AZ4g46135j4Bp7B3gmWM&pullRequest=2180
return nil
}

Expand Down Expand Up @@ -1361,25 +1401,9 @@
return nil, nil, 0, err
}

// No block rewards in PoA, so the state remains as it is
start := time.Now()

// No block rewards in PoA, so the state remains as it is.
// Under delayed SRC, header.Root stores the parent block's actual state root;
// the goroutine in BlockChain.spawnSRCGoroutine handles this block's root.
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
dsrcReader, ok := chain.(core.DelayedSRCReader)
if !ok {
return nil, nil, 0, fmt.Errorf("chain does not implement DelayedSRCReader")
}
parentRoot := dsrcReader.GetPostStateRoot(header.ParentHash)
if parentRoot == (common.Hash{}) {
return nil, nil, 0, fmt.Errorf("delayed state root unavailable for parent %s", header.ParentHash)
}
header.Root = parentRoot
} else {
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
}

header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
commitTime := time.Since(start)

// Uncles are dropped
Expand All @@ -1404,6 +1428,81 @@
return block, receipts, commitTime, nil
}

// FinalizeForPipeline runs the same post-transaction state modifications as
// FinalizeAndAssemble (state sync, span commits, contract code changes) but
// does NOT compute IntermediateRoot or assemble the block. It returns the
// stateSyncData so the caller can pass it to AssembleBlock later after the
// background SRC goroutine has computed the state root.
//
// This is the pipelined SRC equivalent of the first half of FinalizeAndAssemble.
func (c *Bor) FinalizeForPipeline(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, body *types.Body, receipts []*types.Receipt) ([]*types.StateSyncData, error) {
headerNumber := header.Number.Uint64()
if body.Withdrawals != nil || header.WithdrawalsHash != nil {
return nil, consensus.ErrUnexpectedWithdrawals
}
if header.RequestsHash != nil {
return nil, consensus.ErrUnexpectedRequests
}

var (
stateSyncData []*types.StateSyncData
err error
)

if IsSprintStart(headerNumber, c.config.CalculateSprint(headerNumber)) {
cx := statefull.ChainContext{Chain: chain, Bor: c}

if !c.config.IsRio(header.Number) {
if err = c.checkAndCommitSpan(statedb, header, cx); err != nil {
log.Error("Error while committing span", "error", err)
return nil, err
}
}

if c.HeimdallClient != nil {
stateSyncData, err = c.CommitStates(statedb, header, cx)
if err != nil {
log.Error("Error while committing states", "error", err)
return nil, err
}
}
}

if err = c.changeContractCodeIfNeeded(headerNumber, statedb); err != nil {
log.Error("Error changing contract code", "error", err)
return nil, err
}

return stateSyncData, nil
}

// AssembleBlock constructs the final block from a pre-computed state root,
// without calling IntermediateRoot. This is used by pipelined SRC where the
// state root is computed by a background goroutine.
//
// stateSyncData is the state sync data collected during Finalize(). If non-nil
// and the Madhugiri fork is active, a StateSyncTx is appended to the body.
func (c *Bor) AssembleBlock(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, body *types.Body, receipts []*types.Receipt, stateRoot common.Hash, stateSyncData []*types.StateSyncData) (*types.Block, []*types.Receipt, error) {
headerNumber := header.Number.Uint64()

header.Root = stateRoot
header.UncleHash = types.CalcUncleHash(nil)

if len(stateSyncData) > 0 && c.config != nil && c.config.IsMadhugiri(big.NewInt(int64(headerNumber))) {
stateSyncTx := types.NewTx(&types.StateSyncTx{
StateSyncData: stateSyncData,
})
body.Transactions = append(body.Transactions, stateSyncTx)
receipts = insertStateSyncTransactionAndCalculateReceipt(stateSyncTx, header, body, statedb, receipts)
} else {
bc := chain.(core.BorStateSyncer)
bc.SetStateSync(stateSyncData)
}

block := types.NewBlock(header, body, receipts, trie.NewStackTrie(nil))
return block, receipts, nil
}

// Authorize injects a private key into the consensus engine to mint new blocks
// with.
func (c *Bor) Authorize(currentSigner common.Address, signFn SignerFn) {
Expand Down Expand Up @@ -1449,11 +1548,15 @@

var delay time.Duration

// Sweet, the protocol permits us to sign the block, wait for our time
// Sweet, the protocol permits us to sign the block, wait for our time.
// On Giugliano+ primary producers, the wait is performed before building
// in Prepare (or explicitly by the pipeline at the parent boundary), so Seal
// returns immediately and preserves early block announcement. Backups still
// wait until the block timestamp.
if c.config.IsGiugliano(header.Number) && successionNumber == 0 {
delay = 0 // delay was moved to Prepare for giugliano and later
delay = 0
} else {
delay = time.Until(header.GetActualTime()) // Wait until we reach header time
delay = time.Until(header.GetActualTime())
}

// wiggle was already accounted for in header.Time, this is just for logging
Expand All @@ -1470,7 +1573,13 @@
}

// Wait until sealing is terminated or delay timeout.
log.Info("Waiting for slot to sign and propagate", "number", number, "hash", header.Hash(), "delay-in-sec", uint(delay), "delay", common.PrettyDuration(delay))
log.Info(
"Waiting for slot to sign and propagate",
"number", number,
"hash", header.Hash(),
"delay-ms", float64(delay)/float64(time.Millisecond),
"delay", common.PrettyDuration(delay),
)

go func() {
select {
Expand All @@ -1483,7 +1592,7 @@
"Sealing out-of-turn",
"number", number,
"hash", header.Hash,
"wiggle-in-sec", uint(wiggle),
"wiggle-ms", float64(wiggle)/float64(time.Millisecond),
"wiggle", common.PrettyDuration(wiggle),
"in-turn-signer", snap.ValidatorSet.GetProposer().Address.Hex(),
)
Expand Down Expand Up @@ -1597,38 +1706,22 @@
headerNumber := header.Number.Uint64()

tempState := state.Inner().Copy()
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, skip ResetPrefetcher + StartPrefetcher.
// The full-node state is at root_{N-2} with a FlatDiff overlay
// approximating root_{N-1}. ResetPrefetcher clears that overlay,
// causing GetCurrentSpan to read stale root_{N-2} values — different
// from what the stateless node sees at root_{N-1}. The mismatch leads
// to different storage-slot access patterns, so the SRC goroutine
// captures the wrong trie nodes.
//
// StartPrefetcher is also unnecessary: the witness is built by the
// SRC goroutine, and tempState's reads are captured via
// CommitSnapshot + TouchAllAddresses below.
} else {
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)
}
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)

span, err := c.spanner.GetCurrentSpan(ctx, header.ParentHash, tempState)
if err != nil {
return err
}

if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, use CommitSnapshot instead of IntermediateRoot
// to capture all accesses without computing a trie root. Touch
// every address on the main state so they appear in the block's
// FlatDiff and the SRC goroutine includes their trie paths in
// the witness.
tempState.CommitSnapshot(false).TouchAllAddresses(state.Inner())
} else {
tempState.IntermediateRoot(false)
}
tempState.IntermediateRoot(false)

// Propagate addresses accessed during GetCurrentSpan back to the original
// state so they appear in the FlatDiff ReadSet. Without this, the pipelined
// SRC goroutine's witness won't capture their trie proof nodes (the copy's
// reads aren't tracked on the original), causing stateless execution to fail
// with missing trie nodes for the validator contract.
tempState.PropagateReadsTo(state.Inner())

if c.needToCommitSpan(span, headerNumber) {
return c.FetchAndCommitSpan(ctx, span.Id+1, state, header, chain)
Expand Down Expand Up @@ -1765,30 +1858,21 @@
if c.config.IsIndore(header.Number) {
// Fetch the LastStateId from contract via current state instance
tempState := state.Inner().Copy()
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// See comment in checkAndCommitSpan: under delayed SRC,
// skip ResetPrefetcher + StartPrefetcher to preserve the
// FlatDiff overlay and avoid stale root_{N-2} reads.
} else {
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)
}
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)

lastStateIDBig, err = c.GenesisContractsClient.LastStateId(tempState, number-1, header.ParentHash)
if err != nil {
return nil, err
}

if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, use CommitSnapshot instead of
// IntermediateRoot to capture all accesses without computing
// a trie root. Touch every address on the main state so they
// appear in the block's FlatDiff and the SRC goroutine
// includes their trie paths in the witness.
tempState.CommitSnapshot(false).TouchAllAddresses(state.Inner())
} else {
tempState.IntermediateRoot(false)
}
tempState.IntermediateRoot(false)

// Propagate addresses accessed during LastStateId back to the original
// state so they appear in the FlatDiff ReadSet. Without this, the
// pipelined SRC goroutine's witness won't capture their trie proof
// nodes, causing stateless execution to fail with missing trie nodes.
tempState.PropagateReadsTo(state.Inner())

stateSyncDelay := c.config.CalculateStateSyncDelay(number)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
Expand Down
Loading
Loading