Skip to content

Commit

Permalink
IntermediateRoot: add flag for threshold to update concurrently
Browse files Browse the repository at this point in the history
Divide the root updating of stateObjects into goroutines if number of stateObjects is at least the threshold
statedb_test.go/TestIntermediateUpdateConcurrently: add test to check if the states after processed with both options are identical
  • Loading branch information
Francesco4203 committed May 21, 2024
1 parent 9e55df7 commit 324ccbd
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
40 changes: 40 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -122,6 +124,9 @@ type StateDB struct {
StorageUpdated int
AccountDeleted int
StorageDeleted int

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// New creates a new state from a given trie.
Expand Down Expand Up @@ -855,11 +860,46 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.

// Get the stateObjects needed to be updated
updateObjs := []*stateObject{}
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
updateObjs = append(updateObjs, obj)
}
}

if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 {
// Update the state objects sequentially
for _, obj := range updateObjs {
obj.updateRoot(s.db)
}
} else {
// Declare min function
min := func(a, b int) int {
if a < b {
return a
}
return b
}
// Update the state objects using goroutines, with maximum of NumCPU goroutines
nRoutines := min(runtime.NumCPU(), len(updateObjs))
if nRoutines != 0 {
nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines
wg := sync.WaitGroup{}
wg.Add(nRoutines)
for i := 0; i < len(updateObjs); i += nObjPerRoutine {
go func(objs []*stateObject) {
defer wg.Done()
for _, obj := range objs {
obj.updateRoot(s.db)
}
}(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))])
}
wg.Wait()
}
}

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
Expand Down
76 changes: 76 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"testing"
"testing/quick"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -915,3 +916,78 @@ func TestStateDBAccessList(t *testing.T) {
t.Fatalf("expected empty, got %d", got)
}
}

func TestIntermediateUpdateConcurrently(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().Unix()))
// Create an empty state
db1 := rawdb.NewMemoryDatabase()
db2 := rawdb.NewMemoryDatabase()
state1, _ := New(common.Hash{}, NewDatabase(db1), nil)
state2, _ := New(common.Hash{}, NewDatabase(db2), nil)

// Update it with random data
for i := int64(0); i < 1000; i++ {
addr := common.BigToAddress(big.NewInt(i))
balance := big.NewInt(int64(rng.Int63()))
nonce := rng.Uint64()
key := common.BigToHash(big.NewInt(int64(rng.Int63())))
value := common.BigToHash(big.NewInt(int64(rng.Int63())))
code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())}
state1.SetBalance(addr, balance)
state2.SetBalance(addr, balance)
state1.SetNonce(addr, nonce)
state2.SetNonce(addr, nonce)
state1.SetState(addr, key, value)
state2.SetState(addr, key, value)
state1.SetCode(addr, code)
state2.SetCode(addr, code)
}

state1.ConcurrentUpdateThreshold = 0
state2.ConcurrentUpdateThreshold = 1

state1.IntermediateRoot(false) // sequential
state2.IntermediateRoot(false) // concurrent

root1, err1 := state1.Commit(false)
root2, err2 := state2.Commit(false)

if err1 != nil {
t.Fatalf("sequential commit failed: %v", err1)
}
if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil {
t.Errorf("cannot commit trie %v to persistent database", root1.Hex())
}
if err2 != nil {
t.Fatalf("concurrent commit failed: %v", err2)
}
if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil {
t.Errorf("cannot commit trie %v to persistent database", root2.Hex())
}

it1 := db1.NewIterator(nil, nil)
it2 := db2.NewIterator(nil, nil)
for it1.Next() {
if !it2.Next() {
t.Fatalf("concurrent iterator ended prematurely")
}
if !bytes.Equal(it1.Key(), it2.Key()) {
t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key()))
}
if !bytes.Equal(it1.Value(), it2.Value()) {
t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value()))
}
}
if it1.Error() != nil {
t.Fatalf("sequential iterator error: %v", it1.Error())
}
if it2.Error() != nil {
t.Fatalf("concurrent iterator error: %v", it2.Error())
}
if it1.Next() {
t.Fatalf("sequential iterator has extra data")
}
if it2.Next() {
t.Fatalf("concurrent iterator has extra data")
}
}

0 comments on commit 324ccbd

Please sign in to comment.