Skip to content

Commit

Permalink
reduce roundInfo lock contention in syncFromRedis
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Jan 24, 2025
1 parent 40e8c19 commit c300fd6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 20 deletions.
26 changes: 7 additions & 19 deletions execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,27 +491,15 @@ func (es *expressLaneService) syncFromRedis() {
roundInfo.sequence = redisSeqCount
}

var msgReadyForSequencing *timeboost.ExpressLaneSubmission
pendingMsgs := es.redisCoordinator.GetAcceptedTxs(currentRound, roundInfo.sequence)
for _, msg := range pendingMsgs {
// If we get a msg that can be readily sequenced, don't add it to the map
// instead sequence it right after we finish updating the map with rest of the msgs
if msg.SequenceNumber == roundInfo.sequence {
msgReadyForSequencing = msg
} else {
roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{
msg: msg,
resultChan: make(chan error, 1), // will never be read from, but required for sequencing of this msg
}
}
}

es.roundInfo.Add(currentRound, roundInfo)
es.roundInfoMutex.Unlock()

if msgReadyForSequencing != nil {
if err := es.sequenceExpressLaneSubmission(es.GetContext(), msgReadyForSequencing); err != nil {
log.Error("Untracked expressLaneSubmission returned an error", "round", msgReadyForSequencing.Round, "seqNum", msgReadyForSequencing.SequenceNumber, "txHash", msgReadyForSequencing.Transaction.Hash(), "err", err)
}
pendingMsgs := es.redisCoordinator.GetAcceptedTxs(currentRound, roundInfo.sequence)
for _, msg := range pendingMsgs {
es.LaunchThread(func(ctx context.Context) {
if err := es.sequenceExpressLaneSubmission(ctx, msg); err != nil {
log.Error("Untracked expressLaneSubmission returned an error", "round", msg.Round, "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}
})
}
}
3 changes: 2 additions & 1 deletion execution/gethexec/express_lane_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
// Only one tx out of the three should have been processed
require.Equal(t, 1, len(stubPublisher1.publishedTxOrder))

time.Sleep(time.Second) // wait for untracked redis update threads to complete
time.Sleep(time.Second) // wait for parallel redis update threads to complete

els2 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
Expand All @@ -513,6 +513,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {

// As els2 becomes an active sequencer, syncFromRedis would be called when Activate() function of sequencer is invoked
els2.syncFromRedis()
time.Sleep(time.Second) // wait for parallel sequencing of redis txs to complete

els2.roundInfoMutex.Lock()
roundInfo, exists := els2.roundInfo.Get(0)
Expand Down

0 comments on commit c300fd6

Please sign in to comment.