Skip to content

Commit

Permalink
update bold staker items
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljordan committed Jan 8, 2025
2 parents 79b3af0 + 5d4cb93 commit 19b14a8
Show file tree
Hide file tree
Showing 20 changed files with 280 additions and 116 deletions.
85 changes: 62 additions & 23 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand All @@ -23,15 +24,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedBlockHashesInputFeed uint64
cachedPrunedMessageResult uint64
cachedPrunedDelayedMessages uint64
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -121,59 +120,99 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
}

func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, &m.cachedPrunedMessageResult, uint64(messageCount))
if m.cachedPrunedMessages == 0 {
m.cachedPrunedMessages = fetchLastPrunedKey(m.transactionStreamer.db, lastPrunedMessageKey)
}
if m.cachedPrunedDelayedMessages == 0 {
m.cachedPrunedDelayedMessages = fetchLastPrunedKey(m.inboxTracker.db, lastPrunedDelayedMessageKey)
}
prunedKeysRange, _, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting message results: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned message results:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount))
prunedKeysRange, _, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting expected block hashes: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned expected block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
prunedKeysRange, lastPrunedMessage, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}
insertLastPrunedKey(m.transactionStreamer.db, lastPrunedMessageKey, lastPrunedMessage)
m.cachedPrunedMessages = lastPrunedMessage

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
prunedKeysRange, lastPrunedDelayedMessage, err := deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch delayed messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}
insertLastPrunedKey(m.inboxTracker.db, lastPrunedDelayedMessageKey, lastPrunedDelayedMessage)
m.cachedPrunedDelayedMessages = lastPrunedDelayedMessage
return nil
}

// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key if it's not set.
// It's returns the new start key (i.e. last pruned key) at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, startMinKey uint64, endMinKey uint64) ([]uint64, uint64, error) {
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
return nil, nil
return nil, 0, nil
}
startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
return nil, nil
return nil, startMinKey, nil
}
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
return keys, endMinKey - 1, err
}

func insertLastPrunedKey(db ethdb.Database, lastPrunedKey []byte, lastPrunedValue uint64) {
lastPrunedValueByte, err := rlp.EncodeToBytes(lastPrunedValue)
if err != nil {
log.Error("error encoding last pruned value: %w", err)
} else {
err = db.Put(lastPrunedKey, lastPrunedValueByte)
if err != nil {
log.Error("error saving last pruned value: %w", err)
}
}
}

func fetchLastPrunedKey(db ethdb.Database, lastPrunedKey []byte) uint64 {
hasKey, err := db.Has(lastPrunedKey)
if err != nil {
log.Warn("error checking for last pruned key: %w", err)
return 0
}
if !hasKey {
return 0
}
lastPrunedValueByte, err := db.Get(lastPrunedKey)
if err != nil {
log.Warn("error fetching last pruned key: %w", err)
return 0
}
var lastPrunedValue uint64
err = rlp.DecodeBytes(lastPrunedValueByte, &lastPrunedValue)
if err != nil {
log.Warn("error decoding last pruned value: %w", err)
return 0
}
return keys, err
return lastPrunedValue
}
95 changes: 84 additions & 11 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.Staker = legacystaker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.Bold.MinimumGapToParentAssertion = 0

return &config
}
Expand All @@ -230,6 +231,7 @@ func ConfigDefaultL2Test() *Config {
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig
config.Bold.MinimumGapToParentAssertion = 0

return &config
}
Expand Down Expand Up @@ -285,19 +287,21 @@ type Node struct {
}

type SnapSyncConfig struct {
Enabled bool
PrevBatchMessageCount uint64
PrevDelayedRead uint64
BatchCount uint64
DelayedCount uint64
Enabled bool
PrevBatchMessageCount uint64
PrevDelayedRead uint64
BatchCount uint64
DelayedCount uint64
ParentChainAssertionBlock uint64
}

var DefaultSnapSyncConfig = SnapSyncConfig{
Enabled: false,
PrevBatchMessageCount: 0,
BatchCount: 0,
DelayedCount: 0,
PrevDelayedRead: 0,
Enabled: false,
PrevBatchMessageCount: 0,
PrevDelayedRead: 0,
BatchCount: 0,
DelayedCount: 0,
ParentChainAssertionBlock: 0,
}

type ConfigFetcher interface {
Expand Down Expand Up @@ -596,7 +600,29 @@ func createNodeImpl(
if err != nil {
return nil, err
}
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
firstMessageBlock := new(big.Int).SetUint64(deployInfo.DeployedAt)
if config.SnapSyncTest.Enabled {
batchCount := config.SnapSyncTest.BatchCount
delayedMessageNumber, err := exec.NextDelayedMessageNumber()
if err != nil {
return nil, err
}
if batchCount > delayedMessageNumber {
batchCount = delayedMessageNumber
}
// Find the first block containing the batch count.
// Subtract 1 to get the block before the needed batch count,
// this is done to fetch previous batch metadata needed for snap sync.
if batchCount > 0 {
batchCount--
}
block, err := FindBlockContainingBatchCount(ctx, deployInfo.Bridge, l1client, config.SnapSyncTest.ParentChainAssertionBlock, batchCount)
if err != nil {
return nil, err
}
firstMessageBlock.SetUint64(block)
}
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, firstMessageBlock, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -772,6 +798,53 @@ func createNodeImpl(
}, nil
}

func FindBlockContainingBatchCount(ctx context.Context, bridgeAddress common.Address, l1Client *ethclient.Client, parentChainAssertionBlock uint64, batchCount uint64) (uint64, error) {
bridge, err := bridgegen.NewIBridge(bridgeAddress, l1Client)
if err != nil {
return 0, err
}
high := parentChainAssertionBlock
low := uint64(0)
reduceBy := uint64(100)
if high > reduceBy {
low = high - reduceBy
}
// Reduce high and low by 100 until lowNode.InboxMaxCount < batchCount
// This will give us a range (low to high) of blocks that contain the batch count.
for low > 0 {
lowCount, err := bridge.SequencerMessageCount(&bind.CallOpts{Context: ctx, BlockNumber: new(big.Int).SetUint64(low)})
if err != nil {
return 0, err
}
if lowCount.Uint64() > batchCount {
high = low
reduceBy = reduceBy * 2
if low > reduceBy {
low = low - reduceBy
} else {
low = 0
}
} else {
break
}
}
// Then binary search between low and high to find the block containing the batch count.
for low < high {
mid := low + (high-low)/2

midCount, err := bridge.SequencerMessageCount(&bind.CallOpts{Context: ctx, BlockNumber: new(big.Int).SetUint64(mid)})
if err != nil {
return 0, err
}
if midCount.Uint64() < batchCount {
low = mid + 1
} else {
high = mid
}
}
return low, nil
}

func (n *Node) OnConfigReload(_ *Config, _ *Config) error {
// TODO
return nil
Expand Down
10 changes: 6 additions & 4 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ var (
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
messageCountKey []byte = []byte("_messageCount") // contains the current message count
lastPrunedMessageKey []byte = []byte("_lastPrunedMessageKey") // contains the last pruned message key
lastPrunedDelayedMessageKey []byte = []byte("_lastPrunedDelayedMessageKey") // contains the last pruned RLP delayed message key
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
)

const currentDbSchemaVersion uint64 = 1
7 changes: 3 additions & 4 deletions arbos/arbosState/arbosstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
Expand Down Expand Up @@ -123,13 +122,13 @@ func NewArbosMemoryBackedArbOSState() (*ArbosState, *state.StateDB) {
db := state.NewDatabaseWithConfig(raw, trieConfig)
statedb, err := state.New(common.Hash{}, db, nil)
if err != nil {
log.Crit("failed to init empty statedb", "error", err)
panic("failed to init empty statedb: " + err.Error())
}
burner := burn.NewSystemBurner(nil, false)
chainConfig := chaininfo.ArbitrumDevTestChainConfig()
newState, err := InitializeArbosState(statedb, burner, chainConfig, arbostypes.TestInitMessage)
if err != nil {
log.Crit("failed to open the ArbOS state", "error", err)
panic("failed to open the ArbOS state: " + err.Error())
}
return newState, statedb
}
Expand All @@ -139,7 +138,7 @@ func ArbOSVersion(stateDB vm.StateDB) uint64 {
backingStorage := storage.NewGeth(stateDB, burn.NewSystemBurner(nil, false))
arbosVersion, err := backingStorage.GetUint64ByUint64(uint64(versionOffset))
if err != nil {
log.Crit("failed to get the ArbOS version", "error", err)
panic("failed to get the ArbOS version: " + err.Error())
}
return arbosVersion
}
Expand Down
4 changes: 2 additions & 2 deletions arbos/arbosState/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func InitializeArbosInDatabase(db ethdb.Database, cacheConfig *core.CacheConfig,
}()
statedb, err := state.New(common.Hash{}, stateDatabase, nil)
if err != nil {
log.Crit("failed to init empty statedb", "error", err)
panic("failed to init empty statedb :" + err.Error())
}

noStateTrieChangesToCommitError := regexp.MustCompile("^triedb layer .+ is disk layer$")
Expand Down Expand Up @@ -96,7 +96,7 @@ func InitializeArbosInDatabase(db ethdb.Database, cacheConfig *core.CacheConfig,
burner := burn.NewSystemBurner(nil, false)
arbosState, err := InitializeArbosState(statedb, burner, chainConfig, initMessage)
if err != nil {
log.Crit("failed to open the ArbOS state", "error", err)
panic("failed to open the ArbOS state :" + err.Error())
}

chainOwner, err := initData.GetChainOwner()
Expand Down
12 changes: 6 additions & 6 deletions arbos/programs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
package programs

import (
"strconv"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbos/util"
Expand Down Expand Up @@ -151,7 +152,7 @@ func newApiClosures(
case vm.STATICCALL:
ret, returnGas, err = evm.StaticCall(scope.Contract, contract, input, gas)
default:
log.Crit("unsupported call type", "opcode", opcode)
panic("unsupported call type: " + opcode.String())
}

interpreter.SetReturnData(ret)
Expand Down Expand Up @@ -266,7 +267,7 @@ func newApiClosures(
original := input

crash := func(reason string) {
log.Crit("bad API call", "reason", reason, "request", req, "len", len(original), "remaining", len(input))
panic("bad API call reason: " + reason + " request: " + strconv.Itoa(int(req)) + " len: " + strconv.Itoa(len(original)) + " remaining: " + strconv.Itoa(len(input)))
}
takeInput := func(needed int, reason string) []byte {
if len(input) < needed {
Expand Down Expand Up @@ -338,7 +339,7 @@ func newApiClosures(
case StaticCall:
opcode = vm.STATICCALL
default:
log.Crit("unsupported call type", "opcode", opcode)
panic("unsupported call type opcode: " + opcode.String())
}
contract := takeAddress()
value := takeU256()
Expand Down Expand Up @@ -414,8 +415,7 @@ func newApiClosures(
captureHostio(name, args, outs, startInk, endInk)
return []byte{}, nil, 0
default:
log.Crit("unsupported call type", "req", req)
return []byte{}, nil, 0
panic("unsupported call type: " + strconv.Itoa(int(req)))
}
}
}
Loading

0 comments on commit 19b14a8

Please sign in to comment.