diff --git a/op-alt-da/cli.go b/op-alt-da/cli.go index e931707b67f50..30ce2168f5702 100644 --- a/op-alt-da/cli.go +++ b/op-alt-da/cli.go @@ -3,15 +3,19 @@ package altda import ( "fmt" "net/url" + "time" "github.com/urfave/cli/v2" ) var ( - EnabledFlagName = altDAFlags("enabled") - DaServerAddressFlagName = altDAFlags("da-server") - VerifyOnReadFlagName = altDAFlags("verify-on-read") - DaServiceFlag = altDAFlags("da-service") + EnabledFlagName = altDAFlags("enabled") + DaServerAddressFlagName = altDAFlags("da-server") + VerifyOnReadFlagName = altDAFlags("verify-on-read") + DaServiceFlagName = altDAFlags("da-service") + PutTimeoutFlagName = altDAFlags("put-timeout") + GetTimeoutFlagName = altDAFlags("get-timeout") + MaxConcurrentRequestsFlagName = altDAFlags("max-concurrent-da-requests") ) // altDAFlags returns the flag names for altDA @@ -46,20 +50,41 @@ func CLIFlags(envPrefix string, category string) []cli.Flag { Category: category, }, &cli.BoolFlag{ - Name: DaServiceFlag, + Name: DaServiceFlagName, Usage: "Use DA service type where commitments are generated by Alt-DA server", Value: false, EnvVars: altDAEnvs(envPrefix, "DA_SERVICE"), Category: category, }, + &cli.DurationFlag{ + Name: PutTimeoutFlagName, + Usage: "Timeout for put requests. 0 means no timeout.", + Value: time.Duration(0), + EnvVars: altDAEnvs(envPrefix, "PUT_TIMEOUT"), + }, + &cli.DurationFlag{ + Name: GetTimeoutFlagName, + Usage: "Timeout for get requests. 0 means no timeout.", + Value: time.Duration(0), + EnvVars: altDAEnvs(envPrefix, "GET_TIMEOUT"), + }, + &cli.Uint64Flag{ + Name: MaxConcurrentRequestsFlagName, + Usage: "Maximum number of concurrent requests to the DA server", + Value: 1, + EnvVars: altDAEnvs(envPrefix, "MAX_CONCURRENT_DA_REQUESTS"), + }, } } type CLIConfig struct { - Enabled bool - DAServerURL string - VerifyOnRead bool - GenericDA bool + Enabled bool + DAServerURL string + VerifyOnRead bool + GenericDA bool + PutTimeout time.Duration + GetTimeout time.Duration + MaxConcurrentRequests uint64 } func (c CLIConfig) Check() error { @@ -75,14 +100,17 @@ func (c CLIConfig) Check() error { } func (c CLIConfig) NewDAClient() *DAClient { - return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA} + return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout} } func ReadCLIConfig(c *cli.Context) CLIConfig { return CLIConfig{ - Enabled: c.Bool(EnabledFlagName), - DAServerURL: c.String(DaServerAddressFlagName), - VerifyOnRead: c.Bool(VerifyOnReadFlagName), - GenericDA: c.Bool(DaServiceFlag), + Enabled: c.Bool(EnabledFlagName), + DAServerURL: c.String(DaServerAddressFlagName), + VerifyOnRead: c.Bool(VerifyOnReadFlagName), + GenericDA: c.Bool(DaServiceFlagName), + PutTimeout: c.Duration(PutTimeoutFlagName), + GetTimeout: c.Duration(GetTimeoutFlagName), + MaxConcurrentRequests: c.Uint64(MaxConcurrentRequestsFlagName), } } diff --git a/op-alt-da/daclient.go b/op-alt-da/daclient.go index db9c66ce5c216..269b71f3c1043 100644 --- a/op-alt-da/daclient.go +++ b/op-alt-da/daclient.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "time" ) // ErrNotFound is returned when the server could not find the input. @@ -23,10 +24,16 @@ type DAClient struct { verify bool // whether commitment is precomputable (only applicable to keccak256) precompute bool + getTimeout time.Duration + putTimeout time.Duration } func NewDAClient(url string, verify bool, pc bool) *DAClient { - return &DAClient{url, verify, pc} + return &DAClient{ + url: url, + verify: verify, + precompute: pc, + } } // GetInput returns the input data for the given encoded commitment bytes. @@ -35,7 +42,8 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e if err != nil { return nil, fmt.Errorf("failed to create HTTP request: %w", err) } - resp, err := http.DefaultClient.Do(req) + client := &http.Client{Timeout: c.getTimeout} + resp, err := client.Do(req) if err != nil { return nil, err } @@ -91,7 +99,8 @@ func (c *DAClient) setInputWithCommit(ctx context.Context, comm CommitmentData, return fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("Content-Type", "application/octet-stream") - resp, err := http.DefaultClient.Do(req) + client := &http.Client{Timeout: c.putTimeout} + resp, err := client.Do(req) if err != nil { return err } @@ -116,7 +125,8 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er return nil, fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("Content-Type", "application/octet-stream") - resp, err := http.DefaultClient.Do(req) + client := &http.Client{Timeout: c.putTimeout} + resp, err := client.Do(req) if err != nil { return nil, err } diff --git a/op-alt-da/daclient_test.go b/op-alt-da/daclient_test.go index 02a9611ae276d..d9f7902aadee1 100644 --- a/op-alt-da/daclient_test.go +++ b/op-alt-da/daclient_test.go @@ -2,48 +2,14 @@ package altda import ( "context" - "fmt" "math/rand" - "sync" "testing" "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) -type MemStore struct { - db map[string][]byte - lock sync.RWMutex -} - -func NewMemStore() *MemStore { - return &MemStore{ - db: make(map[string][]byte), - } -} - -// Get retrieves the given key if it's present in the key-value store. -func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) { - s.lock.RLock() - defer s.lock.RUnlock() - - if entry, ok := s.db[string(key)]; ok { - return common.CopyBytes(entry), nil - } - return nil, ErrNotFound -} - -// Put inserts the given value into the key-value store. -func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error { - s.lock.Lock() - defer s.lock.Unlock() - - s.db[string(key)] = common.CopyBytes(value) - return nil -} - func TestDAClientPrecomputed(t *testing.T) { store := NewMemStore() logger := testlog.Logger(t, log.LevelDebug) @@ -56,7 +22,7 @@ func TestDAClientPrecomputed(t *testing.T) { cfg := CLIConfig{ Enabled: true, - DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()), + DAServerURL: server.HttpEndpoint(), VerifyOnRead: true, } require.NoError(t, cfg.Check()) @@ -113,7 +79,7 @@ func TestDAClientService(t *testing.T) { cfg := CLIConfig{ Enabled: true, - DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()), + DAServerURL: server.HttpEndpoint(), VerifyOnRead: false, GenericDA: false, } diff --git a/op-alt-da/damock.go b/op-alt-da/damock.go index b56b73fdfcc90..0db129171a822 100644 --- a/op-alt-da/damock.go +++ b/op-alt-da/damock.go @@ -4,8 +4,12 @@ import ( "context" "errors" "io" + "net/http" + "sync" + "time" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" @@ -99,3 +103,84 @@ func (d *AltDADisabled) OnFinalizedHeadSignal(f HeadSignalFn) { func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error { return ErrNotEnabled } + +// FakeDAServer is a fake DA server for e2e tests. +// It is a small wrapper around DAServer that allows for setting request latencies, +// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval). +type FakeDAServer struct { + *DAServer + putRequestLatency time.Duration + getRequestLatency time.Duration +} + +func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer { + store := NewMemStore() + fakeDAServer := &FakeDAServer{ + DAServer: NewDAServer(host, port, store, log, true), + putRequestLatency: 0, + getRequestLatency: 0, + } + return fakeDAServer +} + +func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) { + time.Sleep(s.getRequestLatency) + s.DAServer.HandleGet(w, r) +} + +func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) { + time.Sleep(s.putRequestLatency) + s.DAServer.HandlePut(w, r) +} + +func (s *FakeDAServer) Start() error { + err := s.DAServer.Start() + if err != nil { + return err + } + // Override the HandleGet/Put method registrations + mux := http.NewServeMux() + mux.HandleFunc("/get/", s.HandleGet) + mux.HandleFunc("/put/", s.HandlePut) + s.httpServer.Handler = mux + return nil +} + +func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) { + s.putRequestLatency = latency +} + +func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) { + s.getRequestLatency = latency +} + +type MemStore struct { + db map[string][]byte + lock sync.RWMutex +} + +func NewMemStore() *MemStore { + return &MemStore{ + db: make(map[string][]byte), + } +} + +// Get retrieves the given key if it's present in the key-value store. +func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + if entry, ok := s.db[string(key)]; ok { + return common.CopyBytes(entry), nil + } + return nil, ErrNotFound +} + +// Put inserts the given value into the key-value store. +func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.db[string(key)] = common.CopyBytes(value) + return nil +} diff --git a/op-alt-da/daserver.go b/op-alt-da/daserver.go index ef43fd27fef3c..94446944b5430 100644 --- a/op-alt-da/daserver.go +++ b/op-alt-da/daserver.go @@ -187,8 +187,8 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) { } } -func (b *DAServer) Endpoint() string { - return b.listener.Addr().String() +func (b *DAServer) HttpEndpoint() string { + return fmt.Sprintf("http://%s", b.listener.Addr().String()) } func (b *DAServer) Stop() error { diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 39ebf2f25b240..9bdc242af2bf5 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" ) var ( @@ -299,6 +300,12 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txRef]) queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) + daGroup := &errgroup.Group{} + // errgroup with limit of 0 means no goroutine is able to run concurrently, + // so we only set the limit if it is greater than 0. + if l.Config.MaxConcurrentDARequests > 0 { + daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests)) + } // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -334,8 +341,11 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1(queue, receiptsCh, daGroup) if !l.Txmgr.IsClosed() { + l.Log.Info("Wait for pure DA writes, not L1 txs") + _ = daGroup.Wait() + l.Log.Info("Wait for L1 writes (blobs or DA commitments)") queue.Wait() } else { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -369,7 +379,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1(queue, receiptsCh, daGroup) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -426,14 +436,14 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(l.killCtx, queue, receiptsCh) + err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -483,7 +493,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -492,7 +502,8 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t } l.recordL1Tip(l1tip) - // Collect next transaction data + // Collect next transaction data. This pulls data out of the channel, so we need to make sure + // to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx. txdata, err := l.state.TxData(l1tip.ID()) if err == io.EOF { @@ -503,7 +514,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil { + if err = l.sendTransaction(ctx, txdata, queue, receiptsCh, daGroup); err != nil { return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) } return nil @@ -550,9 +561,48 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // The method will block if the queue's MaxPendingTransactions is exceeded. -func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { +func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { var err error - // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. + + // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. + if l.Config.UseAltDA { + // sanity checks + if nf := len(txdata.frames); nf != 1 { + l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) + } + if txdata.asBlob { + l.Log.Crit("Unexpected blob txdata with AltDA enabled") + } + + // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop + // since it may take a while for the request to return. + goroutineSpawned := daGroup.TryGo(func() error { + // TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs + // but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop + // to exit, which would wait on this DA call to finish, which would take a long time. + // So we prefer to mimic the behavior of txmgr and cancel all pending DA/txmgr requests when the batcher is stopped. + comm, err := l.AltDA.SetInput(l.shutdownCtx, txdata.CallData()) + if err != nil { + l.Log.Error("Failed to post input to Alt DA", "error", err) + // requeue frame if we fail to post to the DA Provider so it can be retried + // note: this assumes that the da server caches requests, otherwise it might lead to resubmissions of the blobs + l.recordFailedDARequest(txdata.ID(), err) + return nil + } + l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID()) + candidate := l.calldataTxCandidate(comm.TxData()) + l.queueTx(txdata, false, candidate, queue, receiptsCh) + return nil + }) + if !goroutineSpawned { + // We couldn't start the goroutine because the errgroup.Group limit + // is already reached. Since we can't send the txdata, we have to + // return it for later processing. We use nil error to skip error logging. + l.recordFailedDARequest(txdata.ID(), nil) + } + // we return nil to allow publishStateToL1 to keep processing the next txdata + return nil + } var candidate *txmgr.TxCandidate if txdata.asBlob { @@ -568,21 +618,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que if nf := len(txdata.frames); nf != 1 { l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) } - data := txdata.CallData() - // if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { - comm, err := l.AltDA.SetInput(ctx, data) - if err != nil { - l.Log.Error("Failed to post input to Alt DA", "error", err) - // requeue frame if we fail to post to the DA Provider so it can be retried - l.recordFailedTx(txdata.ID(), err) - return nil - } - l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID()) - // signal AltDA commitment tx with TxDataVersion1 - data = comm.TxData() - } - candidate = l.calldataTxCandidate(data) + candidate = l.calldataTxCandidate(txdata.CallData()) } l.queueTx(txdata, false, candidate, queue, receiptsCh) @@ -642,6 +678,13 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { l.Metr.RecordLatestL1Block(l1tip) } +func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { + if err != nil { + l.Log.Warn("DA request failed", logFields(id, err)...) + } + l.state.TxFailed(id) +} + func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.Log.Warn("Transaction failed to send", logFields(id, err)...) l.state.TxFailed(id) diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 00d3d32071f7f..1fe813f9913f4 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -39,6 +39,8 @@ type BatcherConfig struct { // UseAltDA is true if the rollup config has a DA challenge address so the batcher // will post inputs to the DA server and post commitments to blobs or calldata. UseAltDA bool + // maximum number of concurrent blob put requests to the DA server + MaxConcurrentDARequests uint64 WaitNodeSync bool CheckRecentTxsDepth int diff --git a/op-e2e/setup.go b/op-e2e/setup.go index c0168b7d207d8..2e97208c12a35 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + altda "github.com/ethereum-optimism/optimism/op-alt-da" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis" @@ -164,18 +165,19 @@ func DefaultSystemConfig(t testing.TB) SystemConfig { }, }, Loggers: map[string]log.Logger{ - RoleVerif: testlog.Logger(t, log.LevelInfo).New("role", RoleVerif), - RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq), - "batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"), - "proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"), + RoleVerif: testlog.Logger(t, log.LevelInfo).New("role", RoleVerif), + RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq), + "batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"), + "proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"), + "da-server": testlog.Logger(t, log.LevelInfo).New("role", "da-server"), }, - GethOptions: map[string][]geth.GethOption{}, - P2PTopology: nil, // no P2P connectivity by default - NonFinalizedProposals: false, - ExternalL2Shim: config.ExternalL2Shim, - DataAvailabilityType: batcherFlags.CalldataType, - MaxPendingTransactions: 1, - BatcherTargetNumFrames: 1, + GethOptions: map[string][]geth.GethOption{}, + P2PTopology: nil, // no P2P connectivity by default + NonFinalizedProposals: false, + ExternalL2Shim: config.ExternalL2Shim, + DataAvailabilityType: batcherFlags.CalldataType, + BatcherMaxPendingTransactions: 1, + BatcherTargetNumFrames: 1, } } @@ -298,12 +300,16 @@ type SystemConfig struct { // If >0, limits the number of blocks per span batch BatcherMaxBlocksPerSpanBatch int + // BatcherMaxPendingTransactions determines how many transactions the batcher will try to send + // concurrently. 0 means unlimited. + BatcherMaxPendingTransactions uint64 + + // BatcherMaxConcurrentDARequest determines how many DAserver requests the batcher is allowed to + // make concurrently. 0 means unlimited. + BatcherMaxConcurrentDARequest uint64 + // SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time SupportL1TimeTravel bool - - // MaxPendingTransactions determines how many transactions the batcher will try to send - // concurrently. 0 means unlimited. - MaxPendingTransactions uint64 } type System struct { @@ -319,6 +325,7 @@ type System struct { L2OutputSubmitter *l2os.ProposerService BatchSubmitter *bss.BatcherService Mocknet mocknet.Mocknet + FakeAltDAServer *altda.FakeDAServer L1BeaconAPIAddr endpoint.RestHTTP @@ -543,6 +550,16 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste } } + var rollupAltDAConfig *rollup.AltDAConfig + if cfg.DeployConfig.UseAltDA { + rollupAltDAConfig = &rollup.AltDAConfig{ + DAChallengeAddress: cfg.L1Deployments.DataAvailabilityChallengeProxy, + DAChallengeWindow: cfg.DeployConfig.DAChallengeWindow, + DAResolveWindow: cfg.DeployConfig.DAResolveWindow, + CommitmentType: altda.GenericCommitmentString, + } + } + makeRollupConfig := func() rollup.Config { return rollup.Config{ Genesis: rollup.Genesis{ @@ -574,6 +591,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste GraniteTime: cfg.DeployConfig.GraniteTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)), InteropTime: cfg.DeployConfig.InteropTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)), ProtocolVersionsAddress: cfg.L1Deployments.ProtocolVersionsProxy, + AltDAConfig: rollupAltDAConfig, } } defaultConfig := makeRollupConfig() @@ -819,11 +837,27 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste compressionAlgo = derive.Brotli10 } + var batcherAltDACLIConfig altda.CLIConfig + if cfg.DeployConfig.UseAltDA { + fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"]) + if err := fakeAltDAServer.Start(); err != nil { + return nil, fmt.Errorf("failed to start fake altDA server: %w", err) + } + sys.FakeAltDAServer = fakeAltDAServer + + batcherAltDACLIConfig = altda.CLIConfig{ + Enabled: cfg.DeployConfig.UseAltDA, + DAServerURL: fakeAltDAServer.HttpEndpoint(), + VerifyOnRead: true, + GenericDA: true, + MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest, + } + } batcherCLIConfig := &bss.CLIConfig{ L1EthRpc: sys.EthInstances[RoleL1].UserRPC().RPC(), L2EthRpc: sys.EthInstances[RoleSeq].UserRPC().RPC(), RollupRpc: sys.RollupNodes[RoleSeq].UserRPC().RPC(), - MaxPendingTransactions: cfg.MaxPendingTransactions, + MaxPendingTransactions: cfg.BatcherMaxPendingTransactions, MaxChannelDuration: 1, MaxL1TxSize: batcherMaxL1TxSizeBytes, TestUseMaxTxSizeForBlobs: cfg.BatcherUseMaxTxSizeForBlobs, @@ -841,6 +875,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch, DataAvailabilityType: sys.Cfg.DataAvailabilityType, CompressionAlgo: compressionAlgo, + AltDA: batcherAltDACLIConfig, } // Batch Submitter batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index a168a8c90df62..d75147ae21460 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -1362,7 +1363,7 @@ func TestBatcherMultiTx(t *testing.T) { InitParallel(t) cfg := DefaultSystemConfig(t) - cfg.MaxPendingTransactions = 0 // no limit on parallel txs + cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs // ensures that batcher txs are as small as possible cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1 cfg.DisableBatcher = true @@ -1402,6 +1403,60 @@ func TestBatcherMultiTx(t *testing.T) { t.Fatal("Expected at least 10 transactions from the batcher") } +func TestBatcherConcurrentAltDARequests(t *testing.T) { + InitParallel(t) + + cfg := DefaultSystemConfig(t) + cfg.DeployConfig.UseAltDA = true + cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs + // ensures that batcher txs are as small as possible + cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1 + cfg.BatcherBatchType = 0 + cfg.DataAvailabilityType = flags.CalldataType + cfg.BatcherMaxConcurrentDARequest = 0 // no limit + + // disable batcher because we start it manually below + cfg.DisableBatcher = true + sys, err := cfg.Start(t) + require.NoError(t, err, "Error starting up system") + defer sys.Close() + + // make every request take 5 seconds, such that only concurrent requests will be able to make progress fast enough + sys.FakeAltDAServer.SetPutRequestLatency(5 * time.Second) + + l1Client := sys.NodeClient("l1") + l2Seq := sys.NodeClient("sequencer") + + // we wait for some L2 blocks to have been produced, just to make sure the sequencer is working properly + _, err = geth.WaitForBlock(big.NewInt(10), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*15)*time.Second) + require.NoError(t, err, "Waiting for L2 blocks") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + startingL1BlockNum, err := l1Client.BlockNumber(ctx) + require.NoError(t, err) + + // start batch submission + driver := sys.BatchSubmitter.TestDriver() + err = driver.StartBatchSubmitting() + require.NoError(t, err) + + totalTxCount := 0 + // wait for up to 10 L1 blocks, expecting 10 L2 batcher txs in them. + // usually only 3 is required, but it's possible additional L1 blocks will be created + // before the batcher starts, so we wait additional blocks. + for i := int64(0); i < 10; i++ { + block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*5)*time.Second) + require.NoError(t, err, "Waiting for l1 blocks") + totalTxCount += len(block.Transactions()) + + if totalTxCount >= 10 { + return + } + } + + t.Fatal("Expected at least 10 transactions from the batcher") +} + func latestBlock(t *testing.T, client *ethclient.Client) uint64 { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()