Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.7

replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/core => ../../core
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)

Expand Down
1 change: 1 addition & 0 deletions apps/grpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.7

replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/core => ../../core
github.com/evstack/ev-node/execution/grpc => ../../execution/grpc
)

Expand Down
5 changes: 4 additions & 1 deletion apps/testapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/evstack/ev-node/apps/testapp

go 1.25.7

replace github.com/evstack/ev-node => ../../.
replace (
github.com/evstack/ev-node => ../../.
github.com/evstack/ev-node/core => ../../core
)

require (
github.com/evstack/ev-node v1.1.1
Expand Down
14 changes: 7 additions & 7 deletions apps/testapp/kv/kvexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,16 @@ func (k *KVExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
// ExecuteTxs processes each transaction assumed to be in the format "key=value".
// It updates the database accordingly using a batch and removes the executed transactions from the mempool.
// Invalid transactions are filtered out and logged, but execution continues.
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) {
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (execution.ExecuteResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
return execution.ExecuteResult{}, ctx.Err()
default:
}

batch, err := k.db.Batch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create database batch: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to create database batch: %w", err)
}

validTxCount := 0
Expand Down Expand Up @@ -291,7 +291,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
err = batch.Put(ctx, dsKey, []byte(value))
if err != nil {
// This error is unlikely for Put unless the context is cancelled.
return nil, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
return execution.ExecuteResult{}, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
}
validTxCount++
}
Expand All @@ -304,7 +304,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
// Commit the batch to apply all changes atomically
err = batch.Commit(ctx)
if err != nil {
return nil, fmt.Errorf("failed to commit transaction batch: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to commit transaction batch: %w", err)
}

k.blocksProduced.Add(1)
Expand All @@ -315,10 +315,10 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
if err != nil {
// This is problematic, state was changed but root calculation failed.
// May need more robust error handling or recovery logic.
return nil, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
}

return stateRoot, nil
return execution.ExecuteResult{UpdatedStateRoot: stateRoot}, nil
}

// SetFinal marks a block as finalized at the specified height.
Expand Down
12 changes: 6 additions & 6 deletions apps/testapp/kv/kvexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func TestExecuteTxs_Valid(t *testing.T) {
[]byte("key2=value2"),
}

stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed: %v", err)
}

// Check that stateRoot contains the updated key-value pairs
rootStr := string(stateRoot)
rootStr := string(result.UpdatedStateRoot)
if !strings.Contains(rootStr, "key1:value1;") || !strings.Contains(rootStr, "key2:value2;") {
t.Errorf("State root does not contain expected key-values: %s", rootStr)
}
Expand All @@ -134,13 +134,13 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs should handle gibberish gracefully, got error: %v", err)
}

// State root should still be computed (empty block is valid)
if stateRoot == nil {
if result.UpdatedStateRoot == nil {
t.Error("Expected non-nil state root even with all invalid transactions")
}

Expand All @@ -152,13 +152,13 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot)
result2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), result.UpdatedStateRoot)
if err != nil {
t.Fatalf("ExecuteTxs should filter invalid transactions and process valid ones, got error: %v", err)
}

// State root should contain only the valid transactions
rootStr := string(stateRoot2)
rootStr := string(result2.UpdatedStateRoot)
if !strings.Contains(rootStr, "valid_key:valid_value") || !strings.Contains(rootStr, "another_valid:value2") {
t.Errorf("State root should contain valid transactions: %s", rootStr)
}
Expand Down
44 changes: 38 additions & 6 deletions block/internal/common/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,12 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
if height == s.genesis.InitialHeight {
// For the first block, use genesis state.
prevState = types.State{
ChainID: s.genesis.ChainID,
InitialHeight: s.genesis.InitialHeight,
LastBlockHeight: s.genesis.InitialHeight - 1,
LastBlockTime: s.genesis.StartTime,
AppHash: header.AppHash, // Genesis app hash (input to first block execution)
ChainID: s.genesis.ChainID,
InitialHeight: s.genesis.InitialHeight,
LastBlockHeight: s.genesis.InitialHeight - 1,
LastBlockTime: s.genesis.StartTime,
AppHash: header.AppHash, // Genesis app hash (input to first block execution)
NextProposerAddress: append([]byte(nil), s.genesis.ProposerAddress...),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
} else {
// GetStateAtHeight(height-1) returns the state AFTER block height-1 was executed,
Expand All @@ -179,10 +180,25 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Int("tx_count", len(rawTxs)).
Msg("executing transactions on execution layer")

newAppHash, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
result, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
if err != nil {
return fmt.Errorf("failed to execute transactions: %w", err)
}
newAppHash := result.UpdatedStateRoot
if len(result.NextProposerAddress) > 0 {
if len(header.NextProposerAddress) == 0 {
return fmt.Errorf("next proposer mismatch at height %d: header empty, execution %x", height, result.NextProposerAddress)
}
if !bytes.Equal(header.NextProposerAddress, result.NextProposerAddress) {
return fmt.Errorf("next proposer mismatch at height %d: header %x, execution %x",
height,
header.NextProposerAddress,
result.NextProposerAddress,
)
}
} else if len(header.NextProposerAddress) > 0 && !bytes.Equal(header.NextProposerAddress, header.ProposerAddress) {
return fmt.Errorf("next proposer mismatch at height %d: header %x, execution unchanged", height, header.NextProposerAddress)
}

// The result of ExecuteTxs (newAppHash) should match the stored state at this height.
// Note: header.AppHash is the PREVIOUS state's app hash (input), not the expected output.
Expand All @@ -207,6 +223,22 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Msg("app hash mismatch during replay")
return err
}
if len(expectedState.NextProposerAddress) > 0 {
expectedNextProposer := header.NextProposerAddress
if len(expectedNextProposer) == 0 {
expectedNextProposer = result.NextProposerAddress
}
if len(expectedNextProposer) == 0 {
expectedNextProposer = header.ProposerAddress
}
if !bytes.Equal(expectedNextProposer, expectedState.NextProposerAddress) {
return fmt.Errorf("next proposer mismatch at height %d: expected %x got %x",
height,
expectedState.NextProposerAddress,
expectedNextProposer,
)
}
}
s.logger.Debug().
Uint64("height", height).
Str("app_hash", hex.EncodeToString(newAppHash)).
Expand Down
103 changes: 78 additions & 25 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,6 @@ func NewExecutor(
return nil, errors.New("signer cannot be nil")
}

addr, err := signer.GetAddress()
if err != nil {
return nil, fmt.Errorf("failed to get address: %w", err)
}

if !bytes.Equal(addr, genesis.ProposerAddress) {
return nil, common.ErrNotProposer
}
}
if raftNode != nil && reflect.ValueOf(raftNode).IsNil() {
raftNode = nil
Expand Down Expand Up @@ -242,15 +234,22 @@ func (e *Executor) initializeState() error {
}

state = types.State{
ChainID: e.genesis.ChainID,
InitialHeight: e.genesis.InitialHeight,
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
ChainID: e.genesis.ChainID,
InitialHeight: e.genesis.InitialHeight,
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
NextProposerAddress: e.initialProposerAddress(e.ctx),
// DA start height is usually 0 at InitChain unless it is a re-genesis or a based sequencer.
DAHeight: e.genesis.DAStartHeight,
}
}
if len(state.NextProposerAddress) == 0 {
state.NextProposerAddress = e.initialProposerAddress(e.ctx)
}
if err := e.assertConfiguredSigner(state.NextProposerAddress); err != nil {
return err
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if e.raftNode != nil {
// Ensure node is fully synced before producing any blocks
Expand Down Expand Up @@ -379,6 +378,32 @@ func (e *Executor) initializeState() error {
return nil
}

func (e *Executor) initialProposerAddress(ctx context.Context) []byte {
if e.exec != nil {
info, err := e.exec.GetExecutionInfo(ctx)
if err != nil {
e.logger.Warn().Err(err).Msg("failed to get execution info for proposer, falling back to genesis proposer")
} else if len(info.NextProposerAddress) > 0 {
return append([]byte(nil), info.NextProposerAddress...)
}
}
return append([]byte(nil), e.genesis.ProposerAddress...)
}

func (e *Executor) assertConfiguredSigner(expectedProposer []byte) error {
if e.config.Node.BasedSequencer {
return nil
}
addr, err := e.signer.GetAddress()
if err != nil {
return fmt.Errorf("failed to get address: %w", err)
}
if !bytes.Equal(addr, expectedProposer) {
return common.ErrNotProposer
}
return nil
}

// executionLoop handles block production and aggregation
func (e *Executor) executionLoop() {
e.logger.Info().Msg("starting execution loop")
Expand Down Expand Up @@ -547,6 +572,13 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}
if !bytes.Equal(newState.NextProposerAddress, header.ProposerAddress) {
header.NextProposerAddress = append([]byte(nil), newState.NextProposerAddress...)
header.InvalidateHash()
} else if len(header.NextProposerAddress) > 0 {
header.NextProposerAddress = nil
header.InvalidateHash()
}

// set the DA height in the sequencer
newState.DAHeight = e.sequencer.GetDAHeight()
Expand Down Expand Up @@ -696,6 +728,10 @@ func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
currentState := e.getLastState()
headerTime := uint64(e.genesis.StartTime.UnixNano())
proposerAddress := currentState.NextProposerAddress
if len(proposerAddress) == 0 {
proposerAddress = e.genesis.ProposerAddress
}

var lastHeaderHash types.Hash
var lastDataHash types.Hash
Expand Down Expand Up @@ -736,14 +772,21 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
if err != nil {
return nil, nil, fmt.Errorf("failed to get public key: %w", err)
}
addr, err := e.signer.GetAddress()
if err != nil {
return nil, nil, fmt.Errorf("failed to get address: %w", err)
}
if !bytes.Equal(addr, proposerAddress) {
return nil, nil, common.ErrNotProposer
}

validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey)
validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, pubKey)
if err != nil {
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
}
} else {
var err error
validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil)
validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
}
Expand All @@ -763,13 +806,13 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
},
LastHeaderHash: lastHeaderHash,
AppHash: currentState.AppHash,
ProposerAddress: e.genesis.ProposerAddress,
ProposerAddress: proposerAddress,
ValidatorHash: validatorHash,
},
Signature: lastSignature,
Signer: types.Signer{
PubKey: pubKey,
Address: e.genesis.ProposerAddress,
Address: proposerAddress,
},
}

Expand Down Expand Up @@ -813,14 +856,24 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty
// Execute transactions
execCtx := context.WithValue(ctx, types.HeaderContextKey, header)

newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
result, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
if err != nil {
e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
if len(result.NextProposerAddress) > 0 {
if len(header.NextProposerAddress) == 0 {
header.NextProposerAddress = append([]byte(nil), result.NextProposerAddress...)
} else if !bytes.Equal(header.NextProposerAddress, result.NextProposerAddress) {
return types.State{}, fmt.Errorf("next proposer mismatch: header %x, execution %x", header.NextProposerAddress, result.NextProposerAddress)
}
header.InvalidateHash()
} else if len(header.NextProposerAddress) > 0 && !bytes.Equal(header.NextProposerAddress, header.ProposerAddress) {
return types.State{}, fmt.Errorf("next proposer mismatch: header %x, execution unchanged", header.NextProposerAddress)
}

// Create new state
newState, err := currentState.NextState(header, newAppHash)
newState, err := currentState.NextState(header, result.UpdatedStateRoot)
if err != nil {
return types.State{}, fmt.Errorf("failed to create next state: %w", err)
}
Expand Down Expand Up @@ -851,12 +904,12 @@ func (e *Executor) signHeader(ctx context.Context, header *types.Header) (types.

// executeTxsWithRetry executes transactions with retry logic.
// NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected.
func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) {
func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) (coreexecutor.ExecuteResult, error) {
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
newAppHash, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
result, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
if err != nil {
if attempt == common.MaxRetriesBeforeHalt {
return nil, fmt.Errorf("failed to execute transactions: %w", err)
return coreexecutor.ExecuteResult{}, fmt.Errorf("failed to execute transactions: %w", err)
}

e.logger.Error().Err(err).
Expand All @@ -869,14 +922,14 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea
case <-time.After(common.MaxRetriesTimeout):
continue
case <-e.ctx.Done():
return nil, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err())
return coreexecutor.ExecuteResult{}, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err())
}
}

return newAppHash, nil
return result, nil
}

return nil, nil
return coreexecutor.ExecuteResult{}, nil
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
Loading
Loading