Skip to content

Commit

Permalink
solve race in expressLaneService
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Jan 3, 2025
1 parent cf6b606 commit 6d8f406
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 248 deletions.
200 changes: 100 additions & 100 deletions execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,19 @@ import (
"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/arbitrum_types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/solgen/go/express_lane_auctiongen"
"github.com/offchainlabs/nitro/timeboost"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type expressLaneControl struct {
sequence uint64
controller common.Address
}

type transactionPublisher interface {
PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions, chan struct{}) error
Config() *SequencerConfig
Expand All @@ -44,18 +38,31 @@ type msgAndResult struct {
resultChan chan error
}

type expressLaneRoundInfo struct {
sync.Mutex
sequence uint64
msgAndResultBySequenceNumber map[uint64]*msgAndResult
}

func (info *expressLaneRoundInfo) reset() {
info.Lock()
defer info.Unlock()

info.sequence = 0
info.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult)
}

type expressLaneService struct {
stopwaiter.StopWaiter
sync.RWMutex
transactionPublisher transactionPublisher
auctionContractAddr common.Address
apiBackend *arbitrum.APIBackend
roundTimingInfo timeboost.RoundTimingInfo
earlySubmissionGrace time.Duration
chainConfig *params.ChainConfig
auctionContract *express_lane_auctiongen.ExpressLaneAuction
roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe
msgAndResultBySequenceNumber map[uint64]*msgAndResult
transactionPublisher transactionPublisher
auctionContractAddr common.Address
apiBackend *arbitrum.APIBackend
roundTimingInfo timeboost.RoundTimingInfo
earlySubmissionGrace time.Duration
chainConfig *params.ChainConfig
auctionContract *express_lane_auctiongen.ExpressLaneAuction
roundControl containers.SyncMap[uint64, common.Address] // thread safe
roundInfo *expressLaneRoundInfo
}

func newExpressLaneService(
Expand Down Expand Up @@ -96,15 +103,16 @@ pending:
}

return &expressLaneService{
transactionPublisher: transactionPublisher,
auctionContract: auctionContract,
apiBackend: apiBackend,
chainConfig: chainConfig,
roundTimingInfo: *roundTimingInfo,
earlySubmissionGrace: earlySubmissionGrace,
roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached.
auctionContractAddr: auctionContractAddr,
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
transactionPublisher: transactionPublisher,
auctionContract: auctionContract,
apiBackend: apiBackend,
chainConfig: chainConfig,
roundTimingInfo: *roundTimingInfo,
earlySubmissionGrace: earlySubmissionGrace,
auctionContractAddr: auctionContractAddr,
roundInfo: &expressLaneRoundInfo{
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
},
}, nil
}

Expand All @@ -114,15 +122,16 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
es.LaunchThread(func(ctx context.Context) {
// Log every new express lane auction round.
log.Info("Watching for new express lane rounds")
waitTime := es.roundTimingInfo.TimeTilNextRound()

// Wait until the next round starts
waitTime := es.roundTimingInfo.TimeTilNextRound()
select {
case <-ctx.Done():
return
case <-time.After(waitTime):
// First tick happened, now set up regular ticks
}

// First tick happened, now set up regular ticks
ticker := time.NewTicker(es.roundTimingInfo.Round)
defer ticker.Stop()
for {
Expand All @@ -141,10 +150,11 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
"round", round,
"timestamp", t,
)
es.Lock()
// Reset the sequence numbers map for the new round.
es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult)
es.Unlock()

// Cleanup previous round data. Better to do this before roundInfo reset as it prevents stale messages from being accepted
es.roundControl.Delete(round - 1)
// Reset the sequence numbers map and sequence count for the new round
es.roundInfo.reset()
}
})

Expand All @@ -154,19 +164,21 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
log.Info("Monitoring express lane auction contract")

var fromBlock uint64
maxBlockSpeed := es.transactionPublisher.Config().MaxBlockSpeed
latestBlock, err := es.apiBackend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if err != nil {
log.Error("ExpressLaneService could not get the latest header", "err", err)
} else {
maxBlocksPerRound := es.roundTimingInfo.Round / es.transactionPublisher.Config().MaxBlockSpeed
maxBlocksPerRound := es.roundTimingInfo.Round / maxBlockSpeed
fromBlock = latestBlock.Number.Uint64()
// #nosec G115
if fromBlock > uint64(maxBlocksPerRound) {
// #nosec G115
fromBlock -= uint64(maxBlocksPerRound)
}
}
ticker := time.NewTicker(es.transactionPublisher.Config().MaxBlockSpeed)

ticker := time.NewTicker(maxBlockSpeed)
defer ticker.Stop()
for {
select {
Expand All @@ -189,6 +201,7 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
Start: fromBlock,
End: &toBlock,
}

it, err := es.auctionContract.FilterAuctionResolved(filterOpts, nil, nil, nil)
if err != nil {
log.Error("Could not filter auction resolutions event", "error", err)
Expand All @@ -200,18 +213,14 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
"round", it.Event.Round,
"controller", it.Event.FirstPriceExpressLaneController,
)
es.roundControl.Add(it.Event.Round, &expressLaneControl{
controller: it.Event.FirstPriceExpressLaneController,
sequence: 0,
})
es.roundControl.Store(it.Event.Round, it.Event.FirstPriceExpressLaneController)
}

setExpressLaneIterator, err := es.auctionContract.FilterSetExpressLaneController(filterOpts, nil, nil, nil)
if err != nil {
log.Error("Could not filter express lane controller transfer event", "error", err)
continue
}

for setExpressLaneIterator.Next() {
if (setExpressLaneIterator.Event.PreviousExpressLaneController == common.Address{}) {
// The ExpressLaneAuction contract emits both AuctionResolved and SetExpressLaneController
Expand All @@ -224,113 +233,122 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
// than trying to overload everything onto SetExpressLaneController.
continue
}
currentRound := es.roundTimingInfo.RoundNumber()
round := setExpressLaneIterator.Event.Round
roundInfo, ok := es.roundControl.Get(round)
if round < currentRound {
log.Info("SetExpressLaneController event's round is lower than current round, not transferring control", "eventRound", round, "currentRound", currentRound)
continue
}
roundController, ok := es.roundControl.Load(round)
if !ok {
log.Warn("Could not find round info for ExpressLaneConroller transfer event", "round", round)
continue
}
if roundInfo.controller != setExpressLaneIterator.Event.PreviousExpressLaneController {
if roundController != setExpressLaneIterator.Event.PreviousExpressLaneController {
log.Warn("Previous ExpressLaneController in SetExpressLaneController event does not match Sequencer previous controller, continuing with transfer to new controller anyway",
"round", round,
"sequencerRoundController", roundInfo.controller,
"sequencerRoundController", roundController,
"previous", setExpressLaneIterator.Event.PreviousExpressLaneController,
"new", setExpressLaneIterator.Event.NewExpressLaneController)
}
if roundInfo.controller == setExpressLaneIterator.Event.NewExpressLaneController {
if roundController == setExpressLaneIterator.Event.NewExpressLaneController {
log.Warn("SetExpressLaneController: Previous and New ExpressLaneControllers are the same, not transferring control.",
"round", round,
"previous", roundInfo.controller,
"previous", roundController,
"new", setExpressLaneIterator.Event.NewExpressLaneController)
continue
}

es.Lock()
// Changes to roundControl by itself are atomic but we need to udpate both roundControl
// and msgAndResultBySequenceNumber atomically here.
es.roundControl.Add(round, &expressLaneControl{
controller: setExpressLaneIterator.Event.NewExpressLaneController,
sequence: 0,
})
// Since the sequence number for this round has been reset to zero, the map of messages
// by sequence number must be reset otherwise old messages would be replayed.
es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult)
es.Unlock()
es.roundControl.Store(round, setExpressLaneIterator.Event.NewExpressLaneController)
if round == currentRound &&
// We dont want reset to be called when a control transfer event is right at the end of a round
// because roundInfo is primarily reset at the beginning of new round by the maintenance thread above.
// And resetting roundInfo in succession may lead to loss of valid messages
es.roundTimingInfo.TimeTilNextRound() > maxBlockSpeed {
es.roundInfo.reset()
}
}
fromBlock = toBlock
}
})
}

func (es *expressLaneService) currentRoundHasController() bool {
control, ok := es.roundControl.Get(es.roundTimingInfo.RoundNumber())
controller, ok := es.roundControl.Load(es.roundTimingInfo.RoundNumber())
if !ok {
return false
}
return control.controller != (common.Address{})
return controller != (common.Address{})
}

// Sequence express lane submission skips validation of the express lane message itself,
// as the core validator logic is handled in `validateExpressLaneTx“
// sequenceExpressLaneSubmission with the roundInfo lock held, validates sequence number and sender address fields of the message
// adds the message to the transaction queue and waits for the response
func (es *expressLaneService) sequenceExpressLaneSubmission(
ctx context.Context,
msg *timeboost.ExpressLaneSubmission,
) error {
unlockByDefer := true
es.Lock()
es.roundInfo.Lock()
defer func() {
if unlockByDefer {
es.Unlock()
es.roundInfo.Unlock()
}
}()
// Although access to roundControl by itself is thread-safe, when the round control is transferred
// we need to reset roundControl and msgAndResultBySequenceNumber atomically, so the following access
// must be within the lock.
control, ok := es.roundControl.Get(msg.Round)

// Below code block isn't a repetition, it prevents stale messages to be accepted during control transfer within or after the round ends!
controller, ok := es.roundControl.Load(msg.Round)
if !ok {
return timeboost.ErrNoOnchainController
}
sender, err := msg.Sender() // Doesn't recompute sender address
if err != nil {
return err
}
if sender != controller {
return timeboost.ErrNotExpressLaneController
}

// Check if the submission nonce is too low.
if msg.SequenceNumber < control.sequence {
if msg.SequenceNumber < es.roundInfo.sequence {
return timeboost.ErrSequenceNumberTooLow
}

// Check if a duplicate submission exists already, and reject if so.
if _, exists := es.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists {
if _, exists := es.roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists {
return timeboost.ErrDuplicateSequenceNumber
}

// Log an informational warning if the message's sequence number is in the future.
if msg.SequenceNumber > control.sequence {
if msg.SequenceNumber > es.roundInfo.sequence {
log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber)
}

// Put into the sequence number map.
resultChan := make(chan error, 1)
es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan}
es.roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan}

now := time.Now()
for es.roundTimingInfo.RoundNumber() == msg.Round { // This check ensures that the controller for this round is not allowed to send transactions from msgAndResultBySequenceNumber map once the next round starts
// Get the next message in the sequence.
nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence]
nextMsgAndResult, exists := es.roundInfo.msgAndResultBySequenceNumber[es.roundInfo.sequence]
if !exists {
break
}
delete(es.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber)
delete(es.roundInfo.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber)
txIsQueued := make(chan struct{})
es.LaunchThread(func(ctx context.Context) {
nextMsgAndResult.resultChan <- es.transactionPublisher.PublishTimeboostedTransaction(ctx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, txIsQueued)
})
<-txIsQueued
// Increase the global round sequence number.
control.sequence += 1
es.roundInfo.sequence += 1
}
es.roundControl.Add(msg.Round, control)

unlockByDefer = false
es.Unlock() // Release lock so that other timeboost txs can be processed
es.roundInfo.Unlock() // Release lock so that other timeboost txs can be processed

queueTimeout := es.transactionPublisher.Config().QueueTimeout
abortCtx, cancel := ctxWithTimeout(ctx, queueTimeout*2) // We use the same timeout value that sequencer imposes
defer cancel()
var err error
select {
case err = <-resultChan:
case <-abortCtx.Done():
Expand All @@ -346,6 +364,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
return nil
}

// validateExpressLaneTx checks for the correctness of all fields of msg except for sequence number and sender address, those are handled by sequenceExpressLaneSubmission to avoid race
func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSubmission) error {
if msg == nil || msg.Transaction == nil || msg.Signature == nil {
return timeboost.ErrMalformedData
Expand All @@ -369,35 +388,16 @@ func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSu
}
}

control, ok := es.roundControl.Get(msg.Round)
controller, ok := es.roundControl.Load(msg.Round)
if !ok {
return timeboost.ErrNoOnchainController
}
// Reconstruct the message being signed over and recover the sender address.
signingMessage, err := msg.ToMessageBytes()
if err != nil {
return timeboost.ErrMalformedData
}
if len(msg.Signature) != 65 {
return errors.Wrap(timeboost.ErrMalformedData, "signature length is not 65")
}
// Recover the public key.
prefixed := crypto.Keccak256(append([]byte(fmt.Sprintf("\x19Ethereum Signed Message:\n%d", len(signingMessage))), signingMessage...))
sigItem := make([]byte, len(msg.Signature))
copy(sigItem, msg.Signature)

// Signature verification expects the last byte of the signature to have 27 subtracted,
// as it represents the recovery ID. If the last byte is greater than or equal to 27, it indicates a recovery ID that hasn't been adjusted yet,
// it's needed for internal signature verification logic.
if sigItem[len(sigItem)-1] >= 27 {
sigItem[len(sigItem)-1] -= 27
}
pubkey, err := crypto.SigToPub(prefixed, sigItem)
// Extract sender address and cache it to be later used by sequenceExpressLaneSubmission
sender, err := msg.Sender()
if err != nil {
return timeboost.ErrMalformedData
return err
}
sender := crypto.PubkeyToAddress(*pubkey)
if sender != control.controller {
if sender != controller {
return timeboost.ErrNotExpressLaneController
}
return nil
Expand Down
Loading

0 comments on commit 6d8f406

Please sign in to comment.