Skip to content

Commit

Permalink
feat: initial goroutine blob submission implementation
Browse files Browse the repository at this point in the history
test(batcher): add e2e test for concurrent altda requests

doc: add explanation comment for FakeDAServer

chore: fix if condition in altda sendTransaction path

feat: add maxConcurrentDaRequests config flag + semaphore

refactor: batcher to use errgroup for da instead of separate semaphore/waitgroup

fix: nil pointer bug after using wrong function after rebase

fix: defn of maxConcurrentDaRequests=0

fix: TestBatcherConcurrentAltDARequests

chore: remove unneeded if statement around time.Sleep

refactor: use TryGo instead of Go to make logic local and easier to read

chore: clean up some comments in batcher

chore: make batcher shutdown cancel pending altda requests by using shutdownCtx instead of killCtx
  • Loading branch information
samlaf committed Aug 30, 2024
1 parent 36f093a commit c42fedd
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 97 deletions.
56 changes: 42 additions & 14 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package altda
import (
"fmt"
"net/url"
"time"

"github.com/urfave/cli/v2"
)

var (
EnabledFlagName = altDAFlags("enabled")
DaServerAddressFlagName = altDAFlags("da-server")
VerifyOnReadFlagName = altDAFlags("verify-on-read")
DaServiceFlag = altDAFlags("da-service")
EnabledFlagName = altDAFlags("enabled")
DaServerAddressFlagName = altDAFlags("da-server")
VerifyOnReadFlagName = altDAFlags("verify-on-read")
DaServiceFlagName = altDAFlags("da-service")
PutTimeoutFlagName = altDAFlags("put-timeout")
GetTimeoutFlagName = altDAFlags("get-timeout")
MaxConcurrentRequestsFlagName = altDAFlags("max-concurrent-da-requests")
)

// altDAFlags returns the flag names for altDA
Expand Down Expand Up @@ -46,20 +50,41 @@ func CLIFlags(envPrefix string, category string) []cli.Flag {
Category: category,
},
&cli.BoolFlag{
Name: DaServiceFlag,
Name: DaServiceFlagName,
Usage: "Use DA service type where commitments are generated by Alt-DA server",
Value: false,
EnvVars: altDAEnvs(envPrefix, "DA_SERVICE"),
Category: category,
},
&cli.DurationFlag{
Name: PutTimeoutFlagName,
Usage: "Timeout for put requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "PUT_TIMEOUT"),
},
&cli.DurationFlag{
Name: GetTimeoutFlagName,
Usage: "Timeout for get requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "GET_TIMEOUT"),
},
&cli.Uint64Flag{
Name: MaxConcurrentRequestsFlagName,
Usage: "Maximum number of concurrent requests to the DA server",
Value: 1,
EnvVars: altDAEnvs(envPrefix, "MAX_CONCURRENT_DA_REQUESTS"),
},
}
}

type CLIConfig struct {
Enabled bool
DAServerURL string
VerifyOnRead bool
GenericDA bool
Enabled bool
DAServerURL string
VerifyOnRead bool
GenericDA bool
PutTimeout time.Duration
GetTimeout time.Duration
MaxConcurrentRequests uint64
}

func (c CLIConfig) Check() error {
Expand All @@ -75,14 +100,17 @@ func (c CLIConfig) Check() error {
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
return CLIConfig{
Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName),
GenericDA: c.Bool(DaServiceFlag),
Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName),
GenericDA: c.Bool(DaServiceFlagName),
PutTimeout: c.Duration(PutTimeoutFlagName),
GetTimeout: c.Duration(GetTimeoutFlagName),
MaxConcurrentRequests: c.Uint64(MaxConcurrentRequestsFlagName),
}
}
18 changes: 14 additions & 4 deletions op-alt-da/daclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"time"
)

// ErrNotFound is returned when the server could not find the input.
Expand All @@ -23,10 +24,16 @@ type DAClient struct {
verify bool
// whether commitment is precomputable (only applicable to keccak256)
precompute bool
getTimeout time.Duration
putTimeout time.Duration
}

func NewDAClient(url string, verify bool, pc bool) *DAClient {
return &DAClient{url, verify, pc}
return &DAClient{
url: url,
verify: verify,
precompute: pc,
}
}

// GetInput returns the input data for the given encoded commitment bytes.
Expand All @@ -35,7 +42,8 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.getTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,7 +99,8 @@ func (c *DAClient) setInputWithCommit(ctx context.Context, comm CommitmentData,
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return err
}
Expand All @@ -116,7 +125,8 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down
38 changes: 2 additions & 36 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,14 @@ package altda

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}

func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}

// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()

if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}

// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()

s.db[string(key)] = common.CopyBytes(value)
return nil
}

func TestDAClientPrecomputed(t *testing.T) {
store := NewMemStore()
logger := testlog.Logger(t, log.LevelDebug)
Expand All @@ -56,7 +22,7 @@ func TestDAClientPrecomputed(t *testing.T) {

cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: true,
}
require.NoError(t, cfg.Check())
Expand Down Expand Up @@ -113,7 +79,7 @@ func TestDAClientService(t *testing.T) {

cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: false,
GenericDA: false,
}
Expand Down
85 changes: 85 additions & 0 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -99,3 +103,84 @@ func (d *AltDADisabled) OnFinalizedHeadSignal(f HeadSignalFn) {
func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error {
return ErrNotEnabled
}

// FakeDAServer is a fake DA server for e2e tests.
// It is a small wrapper around DAServer that allows for setting request latencies,
// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval).
type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
store := NewMemStore()
fakeDAServer := &FakeDAServer{
DAServer: NewDAServer(host, port, store, log, true),
putRequestLatency: 0,
getRequestLatency: 0,
}
return fakeDAServer
}

func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.getRequestLatency)
s.DAServer.HandleGet(w, r)
}

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
s.DAServer.HandlePut(w, r)
}

func (s *FakeDAServer) Start() error {
err := s.DAServer.Start()
if err != nil {
return err
}
// Override the HandleGet/Put method registrations
mux := http.NewServeMux()
mux.HandleFunc("/get/", s.HandleGet)
mux.HandleFunc("/put/", s.HandlePut)
s.httpServer.Handler = mux
return nil
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.getRequestLatency = latency
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}

func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}

// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()

if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}

// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()

s.db[string(key)] = common.CopyBytes(value)
return nil
}
4 changes: 2 additions & 2 deletions op-alt-da/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
}
}

func (b *DAServer) Endpoint() string {
return b.listener.Addr().String()
func (b *DAServer) HttpEndpoint() string {
return fmt.Sprintf("http://%s", b.listener.Addr().String())
}

func (b *DAServer) Stop() error {
Expand Down
Loading

0 comments on commit c42fedd

Please sign in to comment.