diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index e2ebae7447a73..47cf76b274a04 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -23,7 +23,7 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "golang.org/x/sync/semaphore" + "golang.org/x/sync/errgroup" ) var ( @@ -300,8 +300,8 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txRef]) queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) - daWaitGroup := &sync.WaitGroup{} - daSemaphore := semaphore.NewWeighted(int64(l.Config.MaxConcurrentDARequests)) + daGroup := &errgroup.Group{} + daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests)) // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -337,10 +337,10 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh, daWaitGroup, daSemaphore) + l.publishStateToL1(queue, receiptsCh, daGroup) if !l.Txmgr.IsClosed() { l.Log.Info("Wait for pure DA writes, not L1 txs") - daWaitGroup.Wait() + daGroup.Wait() l.Log.Info("Wait for L1 writes (blobs or DA commitments)") queue.Wait() } else { @@ -375,7 +375,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh, daWaitGroup, daSemaphore) + l.publishStateToL1(queue, receiptsCh, daGroup) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -432,14 +432,14 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) { +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup, daSemaphore) + err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -489,10 +489,11 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) error { - // Acquire semaphore or return immediately if it is at capacity - // we don't want to pull data out of the state if we won't be able to send it - if l.Config.UseAltDA && !daSemaphore.TryAcquire(1) { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { + // We use TryGo with an empty function similar to semaphore.TryAcquire() + // if we can't acquire a slot we return io.EOF to stop processing the current batch + // because we don't want to pull data out of the state if we won't be able to send it + if l.Config.UseAltDA && !daGroup.TryGo(func() error { return nil }) { return io.EOF } @@ -515,7 +516,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daWaitGroup, daSemaphore); err != nil { + if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daGroup); err != nil { return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) } return nil @@ -562,7 +563,7 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // The method will block if the queue's MaxPendingTransactions is exceeded. -func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup, daSemaphore *semaphore.Weighted) error { +func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { var err error // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. @@ -577,22 +578,21 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop // since it may take a while for the request to return. - daWaitGroup.Add(1) - go func() { - defer daWaitGroup.Done() - defer daSemaphore.Release(1) + // the number of concurrent goroutines is limited by the semaphore check (daGroup.TryAcquire(1)) in publishTxToL1 + daGroup.Go(func() error { comm, err := l.AltDA.SetInput(ctx, txdata.CallData()) if err != nil { l.Log.Error("Failed to post input to Alt DA", "error", err) // requeue frame if we fail to post to the DA Provider so it can be retried // note: this assumes that the da server caches requests, otherwise it might lead to resubmissions of the blobs l.recordFailedTx(txdata.ID(), err) - return + return nil } l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID()) candidate := l.calldataTxCandidate(comm.TxData()) l.queueTx(txdata, false, candidate, queue, receiptsCh) - }() + return nil + }) // we return nil to allow publishStateToL1 to keep processing the next txdata return nil }