Skip to content

Commit

Permalink
Merge pull request #2415 from OffchainLabs/consensus_exec_split_l1gas
Browse files Browse the repository at this point in the history
[NIT-2558] Consensus/Execution split: simplifies ConsensusSequencer and ExecutionEngine interfaces
  • Loading branch information
tsahee authored Jun 26, 2024
2 parents d906798 + d9a5572 commit 4e0d39c
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 174 deletions.
12 changes: 0 additions & 12 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ var (
baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil)
blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil)
l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil)
l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil)
latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil)
blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil)
blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil)
blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil)
Expand Down Expand Up @@ -568,9 +566,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) {
} else {
suggestedTipCapGauge.Update(suggestedTipCap.Int64())
}
l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice()
l1GasPriceGauge.Update(int64(l1GasPrice))
l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1369,14 +1365,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
"numBlobs", len(kzgBlobs),
)

surplus := arbmath.SaturatingMul(
arbmath.SaturatingSub(
l1GasPriceGauge.Snapshot().Value(),
l1GasPriceEstimateGauge.Snapshot().Value()),
int64(len(sequencerMsg)*16),
)
latestBatchSurplusGauge.Update(surplus)

recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3
postedMessages := b.building.msgCount - batchPosition.MessageCount
b.messagesPerBatch.Update(uint64(postedMessages))
Expand Down
11 changes: 0 additions & 11 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,17 +792,6 @@ func CreateNode(
return currentNode, nil
}

func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged)
}

func (n *Node) BacklogL1GasCharged() uint64 {
return n.TxStreamer.BacklogL1GasCharged()
}
func (n *Node) BacklogCallDataUnits() uint64 {
return n.TxStreamer.BacklogCallDataUnits()
}

func (n *Node) Start(ctx context.Context) error {
execClient, ok := n.Execution.(*gethexec.ExecutionNode)
if !ok {
Expand Down
122 changes: 1 addition & 121 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge

cachedL1PriceDataMutex sync.RWMutex
cachedL1PriceData *L1PriceData
}

type TransactionStreamerConfig struct {
Expand Down Expand Up @@ -118,9 +115,6 @@ func NewTransactionStreamer(
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
}
err := streamer.cleanupInconsistentState()
if err != nil {
Expand All @@ -129,20 +123,6 @@ func NewTransactionStreamer(
return streamer, nil
}

type L1PriceDataOfMsg struct {
callDataUnits uint64
cummulativeCallDataUnits uint64
l1GasCharged uint64
cummulativeL1GasCharged uint64
}

type L1PriceData struct {
startOfL1PriceDataCache arbutil.MessageIndex
endOfL1PriceDataCache arbutil.MessageIndex
msgToL1PriceData []L1PriceDataOfMsg
currentEstimateOfL1GasPrice uint64
}

// Represents a block's hash in the database.
// Necessary because RLP decoder doesn't produce nil values by default.
type blockHashDBValue struct {
Expand All @@ -153,106 +133,6 @@ const (
BlockHashMismatchLogMsg = "BlockHash from feed doesn't match locally computed hash. Check feed source."
)

func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

currentEstimate, err := s.exec.GetL1GasPriceEstimate()
if err != nil {
log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err)
} else {
s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate
}
return s.cachedL1PriceData.currentEstimateOfL1GasPrice
}

func (s *TransactionStreamer) BacklogCallDataUnits() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits +
s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits)
}

func (s *TransactionStreamer) BacklogL1GasCharged() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged +
s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged)
}

func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

if to < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("trying to trim older cache which doesnt exist anymore")
} else if to >= s.cachedL1PriceData.endOfL1PriceDataCache {
s.cachedL1PriceData.startOfL1PriceDataCache = 0
s.cachedL1PriceData.endOfL1PriceDataCache = 0
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{}
} else {
newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1
s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:]
s.cachedL1PriceData.startOfL1PriceDataCache = to + 1
}
}

func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

resetCache := func() {
s.cachedL1PriceData.startOfL1PriceDataCache = seqNum
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: l1GasCharged,
}}
}
size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 ||
s.cachedL1PriceData.startOfL1PriceDataCache == 0 ||
s.cachedL1PriceData.endOfL1PriceDataCache == 0 ||
arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 {
resetCache()
return
}
if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 {
if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 {
log.Info("message position higher then current end of l1 price data cache, resetting cache to this message")
resetCache()
} else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("message position lower than start of l1 price data cache, ignoring")
} else {
log.Info("message position already seen in l1 price data cache, ignoring")
}
} else {
cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits
cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged
s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged,
})
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
}
}

// Encodes a uint64 as bytes in a lexically sortable manner for database iteration.
// Generally this is only used for database keys, which need sorted.
// A shorter RLP encoding is usually used for database values.
Expand Down Expand Up @@ -773,7 +653,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m

if messagesAreConfirmed {
// Trim confirmed messages from l1pricedataCache
s.TrimCache(pos + arbutil.MessageIndex(len(messages)))
s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages)))
s.reorgMutex.RLock()
dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil)
s.reorgMutex.RUnlock()
Expand Down
Loading

0 comments on commit 4e0d39c

Please sign in to comment.