diff --git a/execution/gethexec/express_lane_service.go b/execution/gethexec/express_lane_service.go index 83eecb3510..8629a55113 100644 --- a/execution/gethexec/express_lane_service.go +++ b/execution/gethexec/express_lane_service.go @@ -15,10 +15,8 @@ import ( "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -26,14 +24,10 @@ import ( "github.com/offchainlabs/nitro/solgen/go/express_lane_auctiongen" "github.com/offchainlabs/nitro/timeboost" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/stopwaiter" ) -type expressLaneControl struct { - sequence uint64 - controller common.Address -} - type transactionPublisher interface { PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions, chan struct{}) error Config() *SequencerConfig @@ -44,18 +38,31 @@ type msgAndResult struct { resultChan chan error } +type expressLaneRoundInfo struct { + sync.Mutex + sequence uint64 + msgAndResultBySequenceNumber map[uint64]*msgAndResult +} + +func (info *expressLaneRoundInfo) reset() { + info.Lock() + defer info.Unlock() + + info.sequence = 0 + info.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) +} + type expressLaneService struct { stopwaiter.StopWaiter - sync.RWMutex - transactionPublisher transactionPublisher - auctionContractAddr common.Address - apiBackend *arbitrum.APIBackend - roundTimingInfo timeboost.RoundTimingInfo - earlySubmissionGrace time.Duration - chainConfig *params.ChainConfig - auctionContract *express_lane_auctiongen.ExpressLaneAuction - roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe - msgAndResultBySequenceNumber map[uint64]*msgAndResult + transactionPublisher transactionPublisher + auctionContractAddr common.Address + apiBackend *arbitrum.APIBackend + roundTimingInfo timeboost.RoundTimingInfo + earlySubmissionGrace time.Duration + chainConfig *params.ChainConfig + auctionContract *express_lane_auctiongen.ExpressLaneAuction + roundControl containers.SyncMap[uint64, common.Address] // thread safe + roundInfo *expressLaneRoundInfo } func newExpressLaneService( @@ -96,15 +103,16 @@ pending: } return &expressLaneService{ - transactionPublisher: transactionPublisher, - auctionContract: auctionContract, - apiBackend: apiBackend, - chainConfig: chainConfig, - roundTimingInfo: *roundTimingInfo, - earlySubmissionGrace: earlySubmissionGrace, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached. - auctionContractAddr: auctionContractAddr, - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + transactionPublisher: transactionPublisher, + auctionContract: auctionContract, + apiBackend: apiBackend, + chainConfig: chainConfig, + roundTimingInfo: *roundTimingInfo, + earlySubmissionGrace: earlySubmissionGrace, + auctionContractAddr: auctionContractAddr, + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, }, nil } @@ -114,15 +122,16 @@ func (es *expressLaneService) Start(ctxIn context.Context) { es.LaunchThread(func(ctx context.Context) { // Log every new express lane auction round. log.Info("Watching for new express lane rounds") - waitTime := es.roundTimingInfo.TimeTilNextRound() + // Wait until the next round starts + waitTime := es.roundTimingInfo.TimeTilNextRound() select { case <-ctx.Done(): return case <-time.After(waitTime): - // First tick happened, now set up regular ticks } + // First tick happened, now set up regular ticks ticker := time.NewTicker(es.roundTimingInfo.Round) defer ticker.Stop() for { @@ -141,10 +150,11 @@ func (es *expressLaneService) Start(ctxIn context.Context) { "round", round, "timestamp", t, ) - es.Lock() - // Reset the sequence numbers map for the new round. - es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) - es.Unlock() + + // Cleanup previous round data. Better to do this before roundInfo reset as it prevents stale messages from being accepted + es.roundControl.Delete(round - 1) + // Reset the sequence numbers map and sequence count for the new round + es.roundInfo.reset() } }) @@ -154,11 +164,12 @@ func (es *expressLaneService) Start(ctxIn context.Context) { log.Info("Monitoring express lane auction contract") var fromBlock uint64 + maxBlockSpeed := es.transactionPublisher.Config().MaxBlockSpeed latestBlock, err := es.apiBackend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if err != nil { log.Error("ExpressLaneService could not get the latest header", "err", err) } else { - maxBlocksPerRound := es.roundTimingInfo.Round / es.transactionPublisher.Config().MaxBlockSpeed + maxBlocksPerRound := es.roundTimingInfo.Round / maxBlockSpeed fromBlock = latestBlock.Number.Uint64() // #nosec G115 if fromBlock > uint64(maxBlocksPerRound) { @@ -166,7 +177,8 @@ func (es *expressLaneService) Start(ctxIn context.Context) { fromBlock -= uint64(maxBlocksPerRound) } } - ticker := time.NewTicker(es.transactionPublisher.Config().MaxBlockSpeed) + + ticker := time.NewTicker(maxBlockSpeed) defer ticker.Stop() for { select { @@ -189,6 +201,7 @@ func (es *expressLaneService) Start(ctxIn context.Context) { Start: fromBlock, End: &toBlock, } + it, err := es.auctionContract.FilterAuctionResolved(filterOpts, nil, nil, nil) if err != nil { log.Error("Could not filter auction resolutions event", "error", err) @@ -200,10 +213,7 @@ func (es *expressLaneService) Start(ctxIn context.Context) { "round", it.Event.Round, "controller", it.Event.FirstPriceExpressLaneController, ) - es.roundControl.Add(it.Event.Round, &expressLaneControl{ - controller: it.Event.FirstPriceExpressLaneController, - sequence: 0, - }) + es.roundControl.Store(it.Event.Round, it.Event.FirstPriceExpressLaneController) } setExpressLaneIterator, err := es.auctionContract.FilterSetExpressLaneController(filterOpts, nil, nil, nil) @@ -211,7 +221,6 @@ func (es *expressLaneService) Start(ctxIn context.Context) { log.Error("Could not filter express lane controller transfer event", "error", err) continue } - for setExpressLaneIterator.Next() { if (setExpressLaneIterator.Event.PreviousExpressLaneController == common.Address{}) { // The ExpressLaneAuction contract emits both AuctionResolved and SetExpressLaneController @@ -224,38 +233,39 @@ func (es *expressLaneService) Start(ctxIn context.Context) { // than trying to overload everything onto SetExpressLaneController. continue } + currentRound := es.roundTimingInfo.RoundNumber() round := setExpressLaneIterator.Event.Round - roundInfo, ok := es.roundControl.Get(round) + if round < currentRound { + log.Info("SetExpressLaneController event's round is lower than current round, not transferring control", "eventRound", round, "currentRound", currentRound) + continue + } + roundController, ok := es.roundControl.Load(round) if !ok { log.Warn("Could not find round info for ExpressLaneConroller transfer event", "round", round) continue } - if roundInfo.controller != setExpressLaneIterator.Event.PreviousExpressLaneController { + if roundController != setExpressLaneIterator.Event.PreviousExpressLaneController { log.Warn("Previous ExpressLaneController in SetExpressLaneController event does not match Sequencer previous controller, continuing with transfer to new controller anyway", "round", round, - "sequencerRoundController", roundInfo.controller, + "sequencerRoundController", roundController, "previous", setExpressLaneIterator.Event.PreviousExpressLaneController, "new", setExpressLaneIterator.Event.NewExpressLaneController) } - if roundInfo.controller == setExpressLaneIterator.Event.NewExpressLaneController { + if roundController == setExpressLaneIterator.Event.NewExpressLaneController { log.Warn("SetExpressLaneController: Previous and New ExpressLaneControllers are the same, not transferring control.", "round", round, - "previous", roundInfo.controller, + "previous", roundController, "new", setExpressLaneIterator.Event.NewExpressLaneController) continue } - - es.Lock() - // Changes to roundControl by itself are atomic but we need to udpate both roundControl - // and msgAndResultBySequenceNumber atomically here. - es.roundControl.Add(round, &expressLaneControl{ - controller: setExpressLaneIterator.Event.NewExpressLaneController, - sequence: 0, - }) - // Since the sequence number for this round has been reset to zero, the map of messages - // by sequence number must be reset otherwise old messages would be replayed. - es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) - es.Unlock() + es.roundControl.Store(round, setExpressLaneIterator.Event.NewExpressLaneController) + if round == currentRound && + // We dont want reset to be called when a control transfer event is right at the end of a round + // because roundInfo is primarily reset at the beginning of new round by the maintenance thread above. + // And resetting roundInfo in succession may lead to loss of valid messages + es.roundTimingInfo.TimeTilNextRound() > maxBlockSpeed { + es.roundInfo.reset() + } } fromBlock = toBlock } @@ -263,74 +273,82 @@ func (es *expressLaneService) Start(ctxIn context.Context) { } func (es *expressLaneService) currentRoundHasController() bool { - control, ok := es.roundControl.Get(es.roundTimingInfo.RoundNumber()) + controller, ok := es.roundControl.Load(es.roundTimingInfo.RoundNumber()) if !ok { return false } - return control.controller != (common.Address{}) + return controller != (common.Address{}) } -// Sequence express lane submission skips validation of the express lane message itself, -// as the core validator logic is handled in `validateExpressLaneTx“ +// sequenceExpressLaneSubmission with the roundInfo lock held, validates sequence number and sender address fields of the message +// adds the message to the transaction queue and waits for the response func (es *expressLaneService) sequenceExpressLaneSubmission( ctx context.Context, msg *timeboost.ExpressLaneSubmission, ) error { unlockByDefer := true - es.Lock() + es.roundInfo.Lock() defer func() { if unlockByDefer { - es.Unlock() + es.roundInfo.Unlock() } }() - // Although access to roundControl by itself is thread-safe, when the round control is transferred - // we need to reset roundControl and msgAndResultBySequenceNumber atomically, so the following access - // must be within the lock. - control, ok := es.roundControl.Get(msg.Round) + + // Below code block isn't a repetition, it prevents stale messages to be accepted during control transfer within or after the round ends! + controller, ok := es.roundControl.Load(msg.Round) if !ok { return timeboost.ErrNoOnchainController } + sender, err := msg.Sender() // Doesn't recompute sender address + if err != nil { + return err + } + if sender != controller { + return timeboost.ErrNotExpressLaneController + } // Check if the submission nonce is too low. - if msg.SequenceNumber < control.sequence { + if msg.SequenceNumber < es.roundInfo.sequence { return timeboost.ErrSequenceNumberTooLow } + // Check if a duplicate submission exists already, and reject if so. - if _, exists := es.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists { + if _, exists := es.roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists { return timeboost.ErrDuplicateSequenceNumber } + // Log an informational warning if the message's sequence number is in the future. - if msg.SequenceNumber > control.sequence { + if msg.SequenceNumber > es.roundInfo.sequence { log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber) } + // Put into the sequence number map. resultChan := make(chan error, 1) - es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} + es.roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} now := time.Now() for es.roundTimingInfo.RoundNumber() == msg.Round { // This check ensures that the controller for this round is not allowed to send transactions from msgAndResultBySequenceNumber map once the next round starts // Get the next message in the sequence. - nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence] + nextMsgAndResult, exists := es.roundInfo.msgAndResultBySequenceNumber[es.roundInfo.sequence] if !exists { break } - delete(es.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber) + delete(es.roundInfo.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber) txIsQueued := make(chan struct{}) es.LaunchThread(func(ctx context.Context) { nextMsgAndResult.resultChan <- es.transactionPublisher.PublishTimeboostedTransaction(ctx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, txIsQueued) }) <-txIsQueued // Increase the global round sequence number. - control.sequence += 1 + es.roundInfo.sequence += 1 } - es.roundControl.Add(msg.Round, control) + unlockByDefer = false - es.Unlock() // Release lock so that other timeboost txs can be processed + es.roundInfo.Unlock() // Release lock so that other timeboost txs can be processed queueTimeout := es.transactionPublisher.Config().QueueTimeout abortCtx, cancel := ctxWithTimeout(ctx, queueTimeout*2) // We use the same timeout value that sequencer imposes defer cancel() - var err error select { case err = <-resultChan: case <-abortCtx.Done(): @@ -346,6 +364,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( return nil } +// validateExpressLaneTx checks for the correctness of all fields of msg except for sequence number and sender address, those are handled by sequenceExpressLaneSubmission to avoid race func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSubmission) error { if msg == nil || msg.Transaction == nil || msg.Signature == nil { return timeboost.ErrMalformedData @@ -369,35 +388,16 @@ func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSu } } - control, ok := es.roundControl.Get(msg.Round) + controller, ok := es.roundControl.Load(msg.Round) if !ok { return timeboost.ErrNoOnchainController } - // Reconstruct the message being signed over and recover the sender address. - signingMessage, err := msg.ToMessageBytes() - if err != nil { - return timeboost.ErrMalformedData - } - if len(msg.Signature) != 65 { - return errors.Wrap(timeboost.ErrMalformedData, "signature length is not 65") - } - // Recover the public key. - prefixed := crypto.Keccak256(append([]byte(fmt.Sprintf("\x19Ethereum Signed Message:\n%d", len(signingMessage))), signingMessage...)) - sigItem := make([]byte, len(msg.Signature)) - copy(sigItem, msg.Signature) - - // Signature verification expects the last byte of the signature to have 27 subtracted, - // as it represents the recovery ID. If the last byte is greater than or equal to 27, it indicates a recovery ID that hasn't been adjusted yet, - // it's needed for internal signature verification logic. - if sigItem[len(sigItem)-1] >= 27 { - sigItem[len(sigItem)-1] -= 27 - } - pubkey, err := crypto.SigToPub(prefixed, sigItem) + // Extract sender address and cache it to be later used by sequenceExpressLaneSubmission + sender, err := msg.Sender() if err != nil { - return timeboost.ErrMalformedData + return err } - sender := crypto.PubkeyToAddress(*pubkey) - if sender != control.controller { + if sender != controller { return timeboost.ErrNotExpressLaneController } return nil diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index c003576031..67247507dd 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -17,7 +17,6 @@ import ( "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" @@ -55,23 +54,19 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { es *expressLaneService sub *timeboost.ExpressLaneSubmission expectedErr error - control expressLaneControl + controller common.Address valid bool }{ { - name: "nil msg", - sub: nil, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + name: "nil msg", + sub: nil, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { - name: "nil tx", - sub: &timeboost.ExpressLaneSubmission{}, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + name: "nil tx", + sub: &timeboost.ExpressLaneSubmission{}, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { @@ -79,9 +74,7 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { sub: &timeboost.ExpressLaneSubmission{ Transaction: &types.Transaction{}, }, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { @@ -90,7 +83,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), }, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(2), @@ -106,7 +98,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), }, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), @@ -116,24 +107,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { }, expectedErr: timeboost.ErrWrongAuctionContract, }, - { - name: "no onchain controller", - es: &expressLaneService{ - auctionContractAddr: common.Address{'a'}, - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), - chainConfig: ¶ms.ChainConfig{ - ChainID: big.NewInt(1), - }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - sub: &timeboost.ExpressLaneSubmission{ - ChainId: big.NewInt(1), - AuctionContractAddress: common.Address{'a'}, - Transaction: &types.Transaction{}, - Signature: []byte{'b'}, - }, - expectedErr: timeboost.ErrNoOnchainController, - }, { name: "bad round number", es: &expressLaneService{ @@ -142,11 +115,8 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, }, + controller: common.Address{'b'}, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), AuctionContractAddress: common.Address{'a'}, @@ -164,11 +134,9 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, }, + controller: common.Address{'b'}, + sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), AuctionContractAddress: common.Address{'a'}, @@ -186,14 +154,31 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, }, + controller: common.Address{'b'}, sub: buildInvalidSignatureSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6")), expectedErr: timeboost.ErrNotExpressLaneController, }, + { + name: "no onchain controller", + es: &expressLaneService{ + auctionContractAddr: common.Address{'a'}, + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + chainConfig: ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + }, + }, + sub: &timeboost.ExpressLaneSubmission{ + ChainId: big.NewInt(1), + AuctionContractAddress: common.Address{'a'}, + Transaction: &types.Transaction{}, + Signature: []byte{'b'}, + }, + expectedErr: timeboost.ErrNoOnchainController, + }, { name: "not express lane controller", es: &expressLaneService{ @@ -202,11 +187,11 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, }, + controller: common.Address{'b'}, sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), expectedErr: timeboost.ErrNotExpressLaneController, }, @@ -218,13 +203,10 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv.PublicKey), }, - sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), - valid: true, + controller: crypto.PubkeyToAddress(testPriv.PublicKey), + sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), + valid: true, }, } @@ -232,7 +214,7 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { tt := _tt t.Run(tt.name, func(t *testing.T) { if tt.sub != nil && !errors.Is(tt.expectedErr, timeboost.ErrNoOnchainController) { - tt.es.roundControl.Add(tt.sub.Round, &tt.control) + tt.es.roundControl.Store(tt.sub.Round, tt.controller) } err := tt.es.validateExpressLaneTx(tt.sub) if tt.valid { @@ -257,14 +239,9 @@ func Test_expressLaneService_validateExpressLaneTx_gracePeriod(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), } - es.roundControl.Add(0, &expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv.PublicKey), - }) - es.roundControl.Add(1, &expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv2.PublicKey), - }) + es.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + es.roundControl.Store(1, crypto.PubkeyToAddress(testPriv2.PublicKey)) sub1 := buildValidSubmission(t, auctionContractAddr, testPriv, 0) err := es.validateExpressLaneTx(sub1) @@ -305,8 +282,7 @@ func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, if tx.Hash() != emptyTx.Hash() { return errors.New("oops, bad tx") } - control, _ := s.els.roundControl.Get(0) - s.publishedTxOrder = append(s.publishedTxOrder, control.sequence) + s.publishedTxOrder = append(s.publishedTxOrder, 0) return nil } @@ -319,18 +295,18 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundControl: lru.NewCache[uint64, *expressLaneControl](8), + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, } els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + els.roundInfo.Lock() + els.roundInfo.sequence = 1 + els.roundInfo.Unlock() stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - msg := &timeboost.ExpressLaneSubmission{ - SequenceNumber: 0, - } + msg := buildValidSubmissionWithSeqAndTx(t, 0, 0, emptyTx) err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorIs(t, err, timeboost.ErrSequenceNumberTooLow) @@ -340,20 +316,20 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + els.roundInfo.Lock() + els.roundInfo.sequence = 1 + els.roundInfo.Unlock() stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - msg := &timeboost.ExpressLaneSubmission{ - SequenceNumber: 2, - Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), - } + + msg := buildValidSubmissionWithSeqAndTx(t, 0, 2, types.NewTx(&types.DynamicFeeTx{Data: []byte{1}})) var wg sync.WaitGroup wg.Add(3) // We expect only of the below two to return with an error here var err1, err2 error @@ -383,40 +359,27 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + els.roundInfo.Lock() + els.roundInfo.sequence = 1 + els.roundInfo.Unlock() stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - messages := []*timeboost.ExpressLaneSubmission{ - { - SequenceNumber: 10, - Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}), - }, - { - SequenceNumber: 5, - Transaction: emptyTx, - }, - { - SequenceNumber: 1, - Transaction: emptyTx, - }, - { - SequenceNumber: 4, - Transaction: emptyTx, - }, - { - SequenceNumber: 2, - Transaction: emptyTx, - }, + buildValidSubmissionWithSeqAndTx(t, 0, 10, types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1})), + buildValidSubmissionWithSeqAndTx(t, 0, 5, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 1, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 4, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 2, emptyTx), } + // We launch 5 goroutines out of which 2 would return with a result hence we initially add a delta of 7 var wg sync.WaitGroup wg.Add(7) @@ -435,53 +398,43 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing // We should have only published 2, as we are missing sequence number 3. time.Sleep(2 * time.Second) require.Equal(t, 2, len(stubPublisher.publishedTxOrder)) - els.Lock() - require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted - els.Unlock() + els.roundInfo.Lock() + require.Equal(t, 3, len(els.roundInfo.msgAndResultBySequenceNumber)) // Processed txs are deleted + els.roundInfo.Unlock() wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2 - err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: emptyTx}) + err := els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx)) require.NoError(t, err) wg.Wait() require.Equal(t, 5, len(stubPublisher.publishedTxOrder)) - els.Lock() - require.Equal(t, 1, len(els.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present - els.Unlock() + els.roundInfo.Lock() + require.Equal(t, 1, len(els.roundInfo.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present + els.roundInfo.Unlock() } func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: &expressLaneRoundInfo{ + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + }, + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } els.StopWaiter.Start(ctx, els) - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + els.roundInfo.Lock() + els.roundInfo.sequence = 1 + els.roundInfo.Unlock() stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher messages := []*timeboost.ExpressLaneSubmission{ - { - SequenceNumber: 1, - Transaction: emptyTx, - }, - { - SequenceNumber: 2, - Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}), - }, - { - SequenceNumber: 3, - Transaction: emptyTx, - }, - { - SequenceNumber: 4, - Transaction: emptyTx, - }, + buildValidSubmissionWithSeqAndTx(t, 0, 1, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 2, types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1})), + buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 4, emptyTx), } for _, msg := range messages { if msg.Transaction.Hash() != emptyTx.Hash() { @@ -492,13 +445,12 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. require.NoError(t, err) } } + // One tx out of the four should have failed, so we should have only published 3. // Since sequence number 2 failed after submission stage, that nonce is used up require.Equal(t, 3, len(stubPublisher.publishedTxOrder)) - require.Equal(t, []uint64{1, 3, 4}, stubPublisher.publishedTxOrder) } -// TODO this test is just for RoundTimingInfo func TestIsWithinAuctionCloseWindow(t *testing.T) { initialTimestamp := time.Date(2024, 8, 8, 15, 0, 0, 0, time.UTC) roundTimingInfo := defaultTestRoundTimingInfo(initialTimestamp) @@ -551,15 +503,16 @@ func Benchmark_expressLaneService_validateExpressLaneTx(b *testing.B) { es := &expressLaneService{ auctionContractAddr: common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), - roundControl: lru.NewCache[uint64, *expressLaneControl](8), + roundInfo: &expressLaneRoundInfo{}, chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, } - es.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - controller: addr, - }) + es.roundControl.Store(0, addr) + es.roundInfo.Lock() + es.roundInfo.sequence = 1 + es.roundInfo.Unlock() + sub := buildValidSubmission(b, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0) b.StartTimer() for i := 0; i < b.N; i++ { @@ -625,3 +578,25 @@ func buildValidSubmission( b.Signature = signature return b } + +func buildValidSubmissionWithSeqAndTx( + t testing.TB, + round uint64, + seq uint64, + tx *types.Transaction, +) *timeboost.ExpressLaneSubmission { + b := &timeboost.ExpressLaneSubmission{ + ChainId: big.NewInt(1), + AuctionContractAddress: common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), + Transaction: tx, + Signature: make([]byte, 65), + Round: round, + SequenceNumber: seq, + } + data, err := b.ToMessageBytes() + require.NoError(t, err) + signature, err := buildSignature(testPriv, data) + require.NoError(t, err) + b.Signature = signature + return b +} diff --git a/timeboost/types.go b/timeboost/types.go index 73e2e0d2b6..01a60b8484 100644 --- a/timeboost/types.go +++ b/timeboost/types.go @@ -3,8 +3,11 @@ package timeboost import ( "bytes" "encoding/binary" + "fmt" "math/big" + "github.com/pkg/errors" + "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -176,6 +179,8 @@ type ExpressLaneSubmission struct { Options *arbitrum_types.ConditionalOptions `json:"options"` SequenceNumber uint64 Signature []byte + + sender common.Address } func JsonSubmissionToGo(submission *JsonExpressLaneSubmission) (*ExpressLaneSubmission, error) { @@ -229,6 +234,36 @@ func (els *ExpressLaneSubmission) ToMessageBytes() ([]byte, error) { return buf.Bytes(), nil } +func (els *ExpressLaneSubmission) Sender() (common.Address, error) { + if (els.sender != common.Address{}) { + return els.sender, nil + } + // Reconstruct the message being signed over and recover the sender address. + signingMessage, err := els.ToMessageBytes() + if err != nil { + return common.Address{}, ErrMalformedData + } + if len(els.Signature) != 65 { + return common.Address{}, errors.Wrap(ErrMalformedData, "signature length is not 65") + } + // Recover the public key. + prefixed := crypto.Keccak256(append([]byte(fmt.Sprintf("\x19Ethereum Signed Message:\n%d", len(signingMessage))), signingMessage...)) + sigItem := make([]byte, len(els.Signature)) + copy(sigItem, els.Signature) + // Signature verification expects the last byte of the signature to have 27 subtracted, + // as it represents the recovery ID. If the last byte is greater than or equal to 27, it indicates a recovery ID that hasn't been adjusted yet, + // it's needed for internal signature verification logic. + if sigItem[len(sigItem)-1] >= 27 { + sigItem[len(sigItem)-1] -= 27 + } + pubkey, err := crypto.SigToPub(prefixed, sigItem) + if err != nil { + return common.Address{}, ErrMalformedData + } + els.sender = crypto.PubkeyToAddress(*pubkey) + return els.sender, nil +} + // Helper function to pad a big integer to 32 bytes func padBigInt(bi *big.Int) []byte { bb := bi.Bytes()