Skip to content

Commit

Permalink
Merge pull request #2926 from OffchainLabs/fix-readingpendingelmsgs-r…
Browse files Browse the repository at this point in the history
…edis

Fix reading of pending ExpressLane messages from redis
  • Loading branch information
tsahee authored Feb 5, 2025
2 parents fa41878 + 7bd529d commit 4d43771
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 53 deletions.
7 changes: 6 additions & 1 deletion execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
len(roundInfo.msgAndResultBySequenceNumber)-int(roundInfo.sequence) >= seqConfig.Dangerous.Timeboost.MaxQueuedTxCount {
return fmt.Errorf("reached limit for queuing of future sequence number transactions, please try again with the correct sequence number. Limit: %d, Current sequence number: %d", seqConfig.Dangerous.Timeboost.MaxQueuedTxCount, roundInfo.sequence)
}
if msg.SequenceNumber > roundInfo.sequence+seqConfig.Dangerous.Timeboost.MaxFutureSequenceDistance {
return fmt.Errorf("message sequence number has reached max allowed limit. SequenceNumber: %d, Limit: %d", msg.SequenceNumber, roundInfo.sequence+seqConfig.Dangerous.Timeboost.MaxFutureSequenceDistance)
}
log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber)
}

Expand Down Expand Up @@ -505,9 +508,11 @@ func (es *expressLaneService) syncFromRedis() {
roundInfo.sequence = redisSeqCount
}
es.roundInfo.Add(currentRound, roundInfo)
sequenceCount := roundInfo.sequence
es.roundInfoMutex.Unlock()

pendingMsgs := es.redisCoordinator.GetAcceptedTxs(currentRound, roundInfo.sequence)
pendingMsgs := es.redisCoordinator.GetAcceptedTxs(currentRound, sequenceCount, sequenceCount+es.seqConfig().Dangerous.Timeboost.MaxFutureSequenceDistance)
log.Info("Attempting to sequence pending expressLane transactions from redis", "count", len(pendingMsgs))
for _, msg := range pendingMsgs {
es.LaunchThread(func(ctx context.Context) {
if err := es.sequenceExpressLaneSubmission(ctx, msg); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions execution/gethexec/express_lane_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
Expand Down Expand Up @@ -357,7 +357,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
Expand Down Expand Up @@ -454,7 +454,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
els1 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els1.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els1.roundTimingInfo.Round)
Expand Down Expand Up @@ -496,7 +496,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
els2 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
els2.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els2.roundTimingInfo.Round)
require.NoError(t, err)
Expand Down
38 changes: 22 additions & 16 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,27 @@ type DangerousConfig struct {
}

type TimeboostConfig struct {
Enable bool `koanf:"enable"`
AuctionContractAddress string `koanf:"auction-contract-address"`
AuctioneerAddress string `koanf:"auctioneer-address"`
ExpressLaneAdvantage time.Duration `koanf:"express-lane-advantage"`
SequencerHTTPEndpoint string `koanf:"sequencer-http-endpoint"`
EarlySubmissionGrace time.Duration `koanf:"early-submission-grace"`
MaxQueuedTxCount int `koanf:"max-queued-tx-count"`
RedisUrl string `koanf:"redis-url"`
Enable bool `koanf:"enable"`
AuctionContractAddress string `koanf:"auction-contract-address"`
AuctioneerAddress string `koanf:"auctioneer-address"`
ExpressLaneAdvantage time.Duration `koanf:"express-lane-advantage"`
SequencerHTTPEndpoint string `koanf:"sequencer-http-endpoint"`
EarlySubmissionGrace time.Duration `koanf:"early-submission-grace"`
MaxQueuedTxCount int `koanf:"max-queued-tx-count"`
MaxFutureSequenceDistance uint64 `koanf:"max-future-sequence-distance"`
RedisUrl string `koanf:"redis-url"`
}

var DefaultTimeboostConfig = TimeboostConfig{
Enable: false,
AuctionContractAddress: "",
AuctioneerAddress: "",
ExpressLaneAdvantage: time.Millisecond * 200,
SequencerHTTPEndpoint: "http://localhost:8547",
EarlySubmissionGrace: time.Second * 2,
MaxQueuedTxCount: 10,
RedisUrl: "unset",
Enable: false,
AuctionContractAddress: "",
AuctioneerAddress: "",
ExpressLaneAdvantage: time.Millisecond * 200,
SequencerHTTPEndpoint: "http://localhost:8547",
EarlySubmissionGrace: time.Second * 2,
MaxQueuedTxCount: 10,
MaxFutureSequenceDistance: 100,
RedisUrl: "unset",
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -152,6 +154,9 @@ func (c *TimeboostConfig) Validate() error {
if len(c.AuctioneerAddress) > 0 && !common.IsHexAddress(c.AuctioneerAddress) {
return fmt.Errorf("invalid timeboost.auctioneer-address \"%v\"", c.AuctioneerAddress)
}
if c.MaxFutureSequenceDistance == 0 {
return errors.New("timeboost max-future-sequence-distance option cannot be zero, it should be set to a positive value")
}
return nil
}

Expand Down Expand Up @@ -210,6 +215,7 @@ func TimeboostAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".sequencer-http-endpoint", DefaultTimeboostConfig.SequencerHTTPEndpoint, "this sequencer's http endpoint")
f.Duration(prefix+".early-submission-grace", DefaultTimeboostConfig.EarlySubmissionGrace, "period of time before the next round where submissions for the next round will be queued")
f.Int(prefix+".max-queued-tx-count", DefaultTimeboostConfig.MaxQueuedTxCount, "maximum allowed number of express lane txs with future sequence number to be queued. Set 0 to disable this check and a negative value to prevent queuing of any future sequence number transactions")
f.Uint64(prefix+".max-future-sequence-distance", DefaultTimeboostConfig.MaxFutureSequenceDistance, "maximum allowed difference (in terms of sequence numbers) between a future express lane tx and the current sequence count of a round")
f.String(prefix+".redis-url", DefaultTimeboostConfig.RedisUrl, "the Redis URL for expressLaneService to coordinate via")
}

Expand Down
7 changes: 4 additions & 3 deletions system_tests/timeboost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,9 +1293,10 @@ func setupExpressLaneAuction(
builderSeq.nodeConfig.SeqCoordinator.DeleteFinalizedMsgs = false
builderSeq.execConfig.Sequencer.Enable = true
builderSeq.execConfig.Sequencer.Dangerous.Timeboost = gethexec.TimeboostConfig{
Enable: false, // We need to start without timeboost initially to create the auction contract
ExpressLaneAdvantage: time.Second * 5,
RedisUrl: expressLaneRedisURL,
Enable: false, // We need to start without timeboost initially to create the auction contract
ExpressLaneAdvantage: time.Second * 5,
RedisUrl: expressLaneRedisURL,
MaxFutureSequenceDistance: 1500, // Required for TestExpressLaneTransactionHandlingComplex
}
builderSeq.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1
cleanupSeq := builderSeq.Build(t)
Expand Down
35 changes: 7 additions & 28 deletions timeboost/redis_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -89,50 +87,31 @@ func acceptedTxKeyFor(round, seqNum uint64) string {
return fmt.Sprintf("%s%d.%d", EXPRESS_LANE_ACCEPTED_TX_KEY_PREFIX, round, seqNum)
}

func (rc *RedisCoordinator) GetAcceptedTxs(round, startSeqNum uint64) []*ExpressLaneSubmission {
func (rc *RedisCoordinator) GetAcceptedTxs(round, startSeqNum, endSeqNum uint64) []*ExpressLaneSubmission {
ctx := rc.GetContext()
fetchMsg := func(key string) *ExpressLaneSubmission {
msgBytes, err := rc.client.Get(ctx, key).Bytes()
if err != nil {
log.Error("Error fetching accepted expressLane tx", "err", err)
log.Error("Error fetching accepted expressLane tx", "key", key, "err", err)
return nil
}
msgJson := JsonExpressLaneSubmission{}
if err := json.Unmarshal(msgBytes, &msgJson); err != nil {
log.Error("Error unmarshalling", "err", err)
log.Error("Error unmarshalling", "key", key, "err", err)
return nil
}
msg, err := JsonSubmissionToGo(&msgJson)
if err != nil {
log.Error("Error converting JsonExpressLaneSubmission to ExpressLaneSubmission", "err", err)
log.Error("Error converting JsonExpressLaneSubmission to ExpressLaneSubmission", "key", key, "err", err)
return nil
}
return msg
}

var msgs []*ExpressLaneSubmission
prefix := fmt.Sprintf("%s%d.", EXPRESS_LANE_ACCEPTED_TX_KEY_PREFIX, round)
cursor := uint64(0)
for {
keys, cursor, err := rc.client.Scan(ctx, cursor, prefix+"*", 0).Result()
if err != nil {
break // Best effort
}
for _, key := range keys {
seq, err := strconv.Atoi(strings.TrimPrefix(key, prefix))
if err != nil {
log.Error("Error getting sequence number from the redis key of accepted timeboost Tx", "key", key, "error", err)
continue
}
// #nosec G115
if uint64(seq) >= startSeqNum {
if msg := fetchMsg(key); msg != nil {
msgs = append(msgs, msg)
}
}
}
if cursor == 0 {
break
for seq := startSeqNum; seq <= endSeqNum; seq++ {
if msg := fetchMsg(acceptedTxKeyFor(round, seq)); msg != nil {
msgs = append(msgs, msg)
}
}
return msgs
Expand Down
2 changes: 1 addition & 1 deletion timeboost/redis_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
}

checkCorrectness := func(startSeqNum uint64) {
fetchedMsgs := redisCoordinator.GetAcceptedTxs(round, startSeqNum)
fetchedMsgs := redisCoordinator.GetAcceptedTxs(round, startSeqNum, startSeqNum+5)
if len(fetchedMsgs) != len(addedMsgs[startSeqNum:]) {
t.Fatal("mismatch in number of fetched msgs")
}
Expand Down

0 comments on commit 4d43771

Please sign in to comment.