Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.7

replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/core => ../../core
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)

Expand Down
1 change: 1 addition & 0 deletions apps/grpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.7

replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/core => ../../core
github.com/evstack/ev-node/execution/grpc => ../../execution/grpc
)

Expand Down
5 changes: 4 additions & 1 deletion apps/testapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/evstack/ev-node/apps/testapp

go 1.25.7

replace github.com/evstack/ev-node => ../../.
replace (
github.com/evstack/ev-node => ../../.
github.com/evstack/ev-node/core => ../../core
)

require (
github.com/evstack/ev-node v1.1.1
Expand Down
14 changes: 7 additions & 7 deletions apps/testapp/kv/kvexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,16 @@ func (k *KVExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
// ExecuteTxs processes each transaction assumed to be in the format "key=value".
// It updates the database accordingly using a batch and removes the executed transactions from the mempool.
// Invalid transactions are filtered out and logged, but execution continues.
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) {
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (execution.ExecuteResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
return execution.ExecuteResult{}, ctx.Err()
default:
}

batch, err := k.db.Batch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create database batch: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to create database batch: %w", err)
}

validTxCount := 0
Expand Down Expand Up @@ -291,7 +291,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
err = batch.Put(ctx, dsKey, []byte(value))
if err != nil {
// This error is unlikely for Put unless the context is cancelled.
return nil, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
return execution.ExecuteResult{}, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
}
validTxCount++
}
Expand All @@ -304,7 +304,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
// Commit the batch to apply all changes atomically
err = batch.Commit(ctx)
if err != nil {
return nil, fmt.Errorf("failed to commit transaction batch: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to commit transaction batch: %w", err)
}

k.blocksProduced.Add(1)
Expand All @@ -315,10 +315,10 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
if err != nil {
// This is problematic, state was changed but root calculation failed.
// May need more robust error handling or recovery logic.
return nil, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
return execution.ExecuteResult{}, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
}

return stateRoot, nil
return execution.ExecuteResult{UpdatedStateRoot: stateRoot}, nil
}

// SetFinal marks a block as finalized at the specified height.
Expand Down
12 changes: 6 additions & 6 deletions apps/testapp/kv/kvexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func TestExecuteTxs_Valid(t *testing.T) {
[]byte("key2=value2"),
}

stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed: %v", err)
}

// Check that stateRoot contains the updated key-value pairs
rootStr := string(stateRoot)
rootStr := string(result.UpdatedStateRoot)
if !strings.Contains(rootStr, "key1:value1;") || !strings.Contains(rootStr, "key2:value2;") {
t.Errorf("State root does not contain expected key-values: %s", rootStr)
}
Expand All @@ -134,13 +134,13 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs should handle gibberish gracefully, got error: %v", err)
}

// State root should still be computed (empty block is valid)
if stateRoot == nil {
if result.UpdatedStateRoot == nil {
t.Error("Expected non-nil state root even with all invalid transactions")
}

Expand All @@ -152,13 +152,13 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot)
result2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), result.UpdatedStateRoot)
if err != nil {
t.Fatalf("ExecuteTxs should filter invalid transactions and process valid ones, got error: %v", err)
}

// State root should contain only the valid transactions
rootStr := string(stateRoot2)
rootStr := string(result2.UpdatedStateRoot)
if !strings.Contains(rootStr, "valid_key:valid_value") || !strings.Contains(rootStr, "another_valid:value2") {
t.Errorf("State root should contain valid transactions: %s", rootStr)
}
Expand Down
34 changes: 22 additions & 12 deletions block/internal/common/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,12 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
if height == s.genesis.InitialHeight {
// For the first block, use genesis state.
prevState = types.State{
ChainID: s.genesis.ChainID,
InitialHeight: s.genesis.InitialHeight,
LastBlockHeight: s.genesis.InitialHeight - 1,
LastBlockTime: s.genesis.StartTime,
AppHash: header.AppHash, // Genesis app hash (input to first block execution)
ChainID: s.genesis.ChainID,
InitialHeight: s.genesis.InitialHeight,
LastBlockHeight: s.genesis.InitialHeight - 1,
LastBlockTime: s.genesis.StartTime,
AppHash: header.AppHash, // Genesis app hash (input to first block execution)
NextProposerAddress: append([]byte(nil), s.genesis.ProposerAddress...),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
} else {
// GetStateAtHeight(height-1) returns the state AFTER block height-1 was executed,
Expand All @@ -179,10 +180,16 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Int("tx_count", len(rawTxs)).
Msg("executing transactions on execution layer")

newAppHash, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
result, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
if err != nil {
return fmt.Errorf("failed to execute transactions: %w", err)
}
newAppHash := result.UpdatedStateRoot

newState, err := prevState.NextState(header.Header, newAppHash, result.NextProposerAddress)
if err != nil {
return fmt.Errorf("calculate next state: %w", err)
}

// The result of ExecuteTxs (newAppHash) should match the stored state at this height.
// Note: header.AppHash is the PREVIOUS state's app hash (input), not the expected output.
Expand All @@ -207,6 +214,15 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Msg("app hash mismatch during replay")
return err
}
if len(expectedState.NextProposerAddress) > 0 {
if !bytes.Equal(newState.NextProposerAddress, expectedState.NextProposerAddress) {
return fmt.Errorf("next proposer mismatch at height %d: expected %x got %x",
height,
expectedState.NextProposerAddress,
newState.NextProposerAddress,
)
}
}
s.logger.Debug().
Uint64("height", height).
Str("app_hash", hex.EncodeToString(newAppHash)).
Expand All @@ -219,12 +235,6 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Msg("replayBlock: ExecuteTxs completed (no stored state to verify against)")
}

// Calculate new state
newState, err := prevState.NextState(header.Header, newAppHash)
if err != nil {
return fmt.Errorf("calculate next state: %w", err)
}

// Persist the new state
batch, err := s.store.NewBatch(ctx)
if err != nil {
Expand Down
86 changes: 61 additions & 25 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,6 @@ func NewExecutor(
return nil, errors.New("signer cannot be nil")
}

addr, err := signer.GetAddress()
if err != nil {
return nil, fmt.Errorf("failed to get address: %w", err)
}

if !bytes.Equal(addr, genesis.ProposerAddress) {
return nil, common.ErrNotProposer
}
}
if raftNode != nil && reflect.ValueOf(raftNode).IsNil() {
raftNode = nil
Expand Down Expand Up @@ -242,15 +234,22 @@ func (e *Executor) initializeState() error {
}

state = types.State{
ChainID: e.genesis.ChainID,
InitialHeight: e.genesis.InitialHeight,
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
ChainID: e.genesis.ChainID,
InitialHeight: e.genesis.InitialHeight,
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
NextProposerAddress: e.initialProposerAddress(e.ctx),
// DA start height is usually 0 at InitChain unless it is a re-genesis or a based sequencer.
DAHeight: e.genesis.DAStartHeight,
}
}
if len(state.NextProposerAddress) == 0 {
state.NextProposerAddress = e.initialProposerAddress(e.ctx)
}
if err := e.assertConfiguredSigner(state.NextProposerAddress); err != nil {
return err
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if e.raftNode != nil {
// Ensure node is fully synced before producing any blocks
Expand Down Expand Up @@ -379,6 +378,32 @@ func (e *Executor) initializeState() error {
return nil
}

func (e *Executor) initialProposerAddress(ctx context.Context) []byte {
if e.exec != nil {
info, err := e.exec.GetExecutionInfo(ctx)
if err != nil {
e.logger.Warn().Err(err).Msg("failed to get execution info for proposer, falling back to genesis proposer")
} else if len(info.NextProposerAddress) > 0 {
return append([]byte(nil), info.NextProposerAddress...)
}
}
return append([]byte(nil), e.genesis.ProposerAddress...)
}

func (e *Executor) assertConfiguredSigner(expectedProposer []byte) error {
if e.config.Node.BasedSequencer {
return nil
}
addr, err := e.signer.GetAddress()
if err != nil {
return fmt.Errorf("failed to get address: %w", err)
}
if !bytes.Equal(addr, expectedProposer) {
return common.ErrNotProposer
}
return nil
}

// executionLoop handles block production and aggregation
func (e *Executor) executionLoop() {
e.logger.Info().Msg("starting execution loop")
Expand Down Expand Up @@ -696,6 +721,10 @@ func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
currentState := e.getLastState()
headerTime := uint64(e.genesis.StartTime.UnixNano())
proposerAddress := currentState.NextProposerAddress
if len(proposerAddress) == 0 {
proposerAddress = e.genesis.ProposerAddress
}

var lastHeaderHash types.Hash
var lastDataHash types.Hash
Expand Down Expand Up @@ -736,14 +765,21 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
if err != nil {
return nil, nil, fmt.Errorf("failed to get public key: %w", err)
}
addr, err := e.signer.GetAddress()
if err != nil {
return nil, nil, fmt.Errorf("failed to get address: %w", err)
}
if !bytes.Equal(addr, proposerAddress) {
return nil, nil, common.ErrNotProposer
}

validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey)
validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, pubKey)
if err != nil {
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
}
} else {
var err error
validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil)
validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
}
Expand All @@ -763,13 +799,13 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
},
LastHeaderHash: lastHeaderHash,
AppHash: currentState.AppHash,
ProposerAddress: e.genesis.ProposerAddress,
ProposerAddress: proposerAddress,
ValidatorHash: validatorHash,
},
Signature: lastSignature,
Signer: types.Signer{
PubKey: pubKey,
Address: e.genesis.ProposerAddress,
Address: proposerAddress,
},
}

Expand Down Expand Up @@ -813,14 +849,14 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty
// Execute transactions
execCtx := context.WithValue(ctx, types.HeaderContextKey, header)

newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
result, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
if err != nil {
e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}

// Create new state
newState, err := currentState.NextState(header, newAppHash)
newState, err := currentState.NextState(header, result.UpdatedStateRoot, result.NextProposerAddress)
if err != nil {
return types.State{}, fmt.Errorf("failed to create next state: %w", err)
}
Expand Down Expand Up @@ -851,12 +887,12 @@ func (e *Executor) signHeader(ctx context.Context, header *types.Header) (types.

// executeTxsWithRetry executes transactions with retry logic.
// NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected.
func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) {
func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) (coreexecutor.ExecuteResult, error) {
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
newAppHash, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
result, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
if err != nil {
if attempt == common.MaxRetriesBeforeHalt {
return nil, fmt.Errorf("failed to execute transactions: %w", err)
return coreexecutor.ExecuteResult{}, fmt.Errorf("failed to execute transactions: %w", err)
}

e.logger.Error().Err(err).
Expand All @@ -869,14 +905,14 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea
case <-time.After(common.MaxRetriesTimeout):
continue
case <-e.ctx.Done():
return nil, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err())
return coreexecutor.ExecuteResult{}, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err())
}
}

return newAppHash, nil
return result, nil
}

return nil, nil
return coreexecutor.ExecuteResult{}, nil
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
4 changes: 2 additions & 2 deletions block/internal/executing/executor_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func (s *stubExecClient) InitChain(context.Context, time.Time, uint64, string) (
return s.stateRoot, nil
}
func (s *stubExecClient) GetTxs(context.Context) ([][]byte, error) { return nil, nil }
func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) {
return s.stateRoot, nil
func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) (coreexec.ExecuteResult, error) {
return coreexec.ExecuteResult{UpdatedStateRoot: s.stateRoot}, nil
}
func (s *stubExecClient) SetFinal(context.Context, uint64) error { return nil }
func (s *stubExecClient) GetExecutionInfo(context.Context) (coreexec.ExecutionInfo, error) {
Expand Down
Loading
Loading