Skip to content

Commit

Permalink
refactor: batcher to use errgroup for da instead of separate semaphor…
Browse files Browse the repository at this point in the history
…e/waitgroup
  • Loading branch information
samlaf committed Aug 30, 2024
1 parent 526ad68 commit c7f96cc
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -300,8 +300,8 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daWaitGroup := &sync.WaitGroup{}
daSemaphore := semaphore.NewWeighted(int64(l.Config.MaxConcurrentDARequests))
daGroup := &errgroup.Group{}
daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests))

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand Down Expand Up @@ -337,10 +337,10 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh, daWaitGroup, daSemaphore)
l.publishStateToL1(queue, receiptsCh, daGroup)
if !l.Txmgr.IsClosed() {
l.Log.Info("Wait for pure DA writes, not L1 txs")
daWaitGroup.Wait()
daGroup.Wait()
l.Log.Info("Wait for L1 writes (blobs or DA commitments)")
queue.Wait()
} else {
Expand Down Expand Up @@ -375,7 +375,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh, daWaitGroup, daSemaphore)
l.publishStateToL1(queue, receiptsCh, daGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -432,14 +432,14 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup, daSemaphore)
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -489,10 +489,11 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) error {
// Acquire semaphore or return immediately if it is at capacity
// we don't want to pull data out of the state if we won't be able to send it
if l.Config.UseAltDA && !daSemaphore.TryAcquire(1) {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
// We use TryGo with an empty function similar to semaphore.TryAcquire()
// if we can't acquire a slot we return io.EOF to stop processing the current batch
// because we don't want to pull data out of the state if we won't be able to send it
if l.Config.UseAltDA && !daGroup.TryGo(func() error { return nil }) {
return io.EOF
}

Expand All @@ -515,7 +516,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daWaitGroup, daSemaphore); err != nil {
if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daGroup); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
return nil
Expand Down Expand Up @@ -562,7 +563,7 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
Expand All @@ -577,22 +578,21 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
daWaitGroup.Add(1)
go func() {
defer daWaitGroup.Done()
defer daSemaphore.Release(1)
// the number of concurrent goroutines is limited by the semaphore check (daGroup.TryAcquire(1)) in publishTxToL1
daGroup.Go(func() error {
comm, err := l.AltDA.SetInput(ctx, txdata.CallData())
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
// note: this assumes that the da server caches requests, otherwise it might lead to resubmissions of the blobs
l.recordFailedTx(txdata.ID(), err)
return
return nil
}
l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
candidate := l.calldataTxCandidate(comm.TxData())
l.queueTx(txdata, false, candidate, queue, receiptsCh)
}()
return nil
})
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}
Expand Down

0 comments on commit c7f96cc

Please sign in to comment.