Skip to content

Commit

Permalink
fix(batcher): sync_actions data race
Browse files Browse the repository at this point in the history
computeSyncActions function of main goroutine is racing with the tx processing goroutine because it calls the channel functions directly without passing by the channelManager (which holds the lock). Fixed by passing channelManager lock to computeSyncActions
  • Loading branch information
samlaf committed Dec 10, 2024
1 parent 3cbe31f commit 0a8bbe4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
}

// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.state.blocks, l.state.channelQueue, l.Log)
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.state.blocks, l.state.channelQueue, &l.state.mu, l.Log)

if outOfSync {
// If the sequencer is out of sync
Expand Down
5 changes: 4 additions & 1 deletion op-batcher/batcher/sync_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batcher

import (
"fmt"
"sync"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (s syncActions) String() string {
// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned
// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block
// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync.
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) {
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, channelsMu *sync.Mutex, l log.Logger) (syncActions, bool) {

// PART 1: Initial checks on the sync status
if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) {
Expand Down Expand Up @@ -111,6 +112,8 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
}

// PART 4: checks involving channels
channelsMu.Lock()
defer channelsMu.Unlock()
for _, ch := range channels {
if ch.isFullySubmitted() &&
!ch.isTimedOut() &&
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/sync_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batcher

import (
"math/big"
"sync"
"testing"

"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -283,7 +284,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
l, h := testlog.CaptureLogger(t, log.LevelDebug)

result, outOfSync := computeSyncActions(
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l,
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, &sync.Mutex{}, l,
)

require.Equal(t, tc.expected, result, "unexpected actions")
Expand Down

0 comments on commit 0a8bbe4

Please sign in to comment.