Skip to content

Commit

Permalink
feat(op-conductor): implement startup handshake (ethereum-optimism#12047
Browse files Browse the repository at this point in the history
)

* op-node waits establishes connection to conductor before starting in sequencer enabled mode

* Added conductor enabled api to op-node

* check node enabled conductor during conductor startup

* update logs

* Change back to lazy initialization

* Add method not found check
  • Loading branch information
0x00101010 authored Sep 26, 2024
1 parent 5798c5f commit d83f12d
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 3 deletions.
56 changes: 56 additions & 0 deletions op-conductor/client/mocks/SequencerControl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions op-conductor/client/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SequencerControl interface {
SequencerActive(ctx context.Context) (bool, error)
LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error)
PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
ConductorEnabled(ctx context.Context) (bool, error)
}

// NewSequencerControl creates a new SequencerControl instance.
Expand Down Expand Up @@ -59,3 +60,8 @@ func (s *sequencerController) SequencerActive(ctx context.Context) (bool, error)
func (s *sequencerController) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return s.node.PostUnsafePayload(ctx, payload)
}

// ConductorEnabled implements SequencerControl.
func (s *sequencerController) ConductorEnabled(ctx context.Context) (bool, error) {
return s.node.ConductorEnabled(ctx)
}
20 changes: 20 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/retry"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
Expand Down Expand Up @@ -140,6 +141,25 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error {
node := sources.NewRollupClient(nc)
c.ctrl = client.NewSequencerControl(exec, node)

enabled, err := retry.Do(ctx, 60, retry.Fixed(5*time.Second), func() (bool, error) {
enabled, err := c.ctrl.ConductorEnabled(ctx)
if rpcErr, ok := err.(rpc.Error); ok {
errCode := rpcErr.ErrorCode()
errText := strings.ToLower(err.Error())
if errCode == -32601 || strings.Contains(errText, "method not found") { // method not found error
c.log.Warn("Warning: conductorEnabled method not found, please upgrade your op-node to the latest version, continuing...")
return true, nil
}
}
return enabled, err
})
if err != nil {
return errors.Wrap(err, "failed to connect to sequencer")
}
if !enabled {
return errors.New("conductor is not enabled on sequencer, exiting...")
}

return c.updateSequencerActiveStatus()
}

Expand Down
4 changes: 4 additions & 0 deletions op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, envelope *eth
return nil
}

func (s *l2VerifierBackend) ConductorEnabled(ctx context.Context) (bool, error) {
return false, nil
}

func (s *L2Verifier) DerivationMetricsTracer() *testutils.TestDerivationMetrics {
return s.derivationMetrics
}
Expand Down
8 changes: 8 additions & 0 deletions op-node/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type driverClient interface {
SequencerActive(context.Context) (bool, error)
OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
OverrideLeader(ctx context.Context) error
ConductorEnabled(ctx context.Context) (bool, error)
}

type SafeDBReader interface {
Expand Down Expand Up @@ -98,6 +99,13 @@ func (n *adminAPI) OverrideLeader(ctx context.Context) error {
return n.dr.OverrideLeader(ctx)
}

// ConductorEnabled returns true if the sequencer conductor is enabled.
func (n *adminAPI) ConductorEnabled(ctx context.Context) (bool, error) {
recordDur := n.M.RecordRPCServerRequest("admin_conductorEnabled")
defer recordDur()
return n.dr.ConductorEnabled(ctx)
}

type nodeAPI struct {
config *rollup.Config
client l2EthClient
Expand Down
7 changes: 6 additions & 1 deletion op-node/node/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ConductorClient struct {
var _ conductor.SequencerConductor = &ConductorClient{}

// NewConductorClient returns a new conductor client for the op-conductor RPC service.
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) *ConductorClient {
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) conductor.SequencerConductor {
return &ConductorClient{
cfg: cfg,
metrics: metrics,
Expand All @@ -53,6 +53,11 @@ func (c *ConductorClient) initialize() error {
return nil
}

// Enabled returns true if the conductor is enabled, and since the conductor client is initialized, the conductor is always enabled.
func (c *ConductorClient) Enabled(ctx context.Context) bool {
return true
}

// Leader returns true if this node is the leader sequencer.
func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
if c.overrideLeader.Load() {
Expand Down
4 changes: 4 additions & 0 deletions op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (c *mockDriverClient) OverrideLeader(ctx context.Context) error {
return c.Mock.MethodCalled("OverrideLeader").Get(0).(error)
}

func (c *mockDriverClient) ConductorEnabled(ctx context.Context) (bool, error) {
return c.Mock.MethodCalled("ConductorEnabled").Get(0).(bool), nil
}

type mockSafeDBReader struct {
mock.Mock
}
Expand Down
7 changes: 7 additions & 0 deletions op-node/rollup/conductor/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
// SequencerConductor is an interface for the driver to communicate with the sequencer conductor.
// It is used to determine if the current node is the active sequencer, and to commit unsafe payloads to the conductor log.
type SequencerConductor interface {
// Enabled returns true if the conductor is enabled.
Enabled(ctx context.Context) bool
// Leader returns true if this node is the leader sequencer.
Leader(ctx context.Context) (bool, error)
// CommitUnsafePayload commits an unsafe payload to the conductor FSM.
Expand All @@ -24,6 +26,11 @@ type NoOpConductor struct{}

var _ SequencerConductor = &NoOpConductor{}

// Enabled implements SequencerConductor.
func (c *NoOpConductor) Enabled(ctx context.Context) bool {
return false
}

// Leader returns true if this node is the leader sequencer. NoOpConductor always returns true.
func (c *NoOpConductor) Leader(ctx context.Context) (bool, error) {
return true, nil
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ func (s *Driver) OverrideLeader(ctx context.Context) error {
return s.sequencer.OverrideLeader(ctx)
}

func (s *Driver) ConductorEnabled(ctx context.Context) (bool, error) {
return s.sequencer.ConductorEnabled(ctx), nil
}

// SyncStatus blocks the driver event loop and captures the syncing status.
func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
return s.statusTracker.SyncStatus(), nil
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/sequencing/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error {
return ErrSequencerNotEnabled
}

func (ds DisabledSequencer) ConductorEnabled(ctx context.Context) bool {
return false
}

func (ds DisabledSequencer) Close() {}
1 change: 1 addition & 0 deletions op-node/rollup/sequencing/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ type SequencerIface interface {
Stop(ctx context.Context) (hash common.Hash, err error)
SetMaxSafeLag(ctx context.Context, v uint64) error
OverrideLeader(ctx context.Context) error
ConductorEnabled(ctx context.Context) bool
Close()
}
6 changes: 4 additions & 2 deletions op-node/rollup/sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,6 @@ func (d *Sequencer) Init(ctx context.Context, active bool) error {
d.emitter.Emit(engine.ForkchoiceRequestEvent{})

if active {
// TODO(#11121): should the conductor be checked on startup?
// The conductor was previously not being checked in this case, but that may be a bug.
return d.forceStart()
} else {
if err := d.listener.SequencerStopped(); err != nil {
Expand Down Expand Up @@ -712,6 +710,10 @@ func (d *Sequencer) OverrideLeader(ctx context.Context) error {
return d.conductor.OverrideLeader(ctx)
}

func (d *Sequencer) ConductorEnabled(ctx context.Context) bool {
return d.conductor.Enabled(ctx)
}

func (d *Sequencer) Close() {
d.conductor.Close()
d.asyncGossip.Stop()
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ type FakeConductor struct {

var _ conductor.SequencerConductor = &FakeConductor{}

func (c *FakeConductor) Enabled(ctx context.Context) bool {
return true
}

func (c *FakeConductor) Leader(ctx context.Context) (bool, error) {
return c.leader, nil
}
Expand Down
6 changes: 6 additions & 0 deletions op-service/sources/rollupclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (r *RollupClient) OverrideLeader(ctx context.Context) error {
return r.rpc.CallContext(ctx, nil, "admin_overrideLeader")
}

func (r *RollupClient) ConductorEnabled(ctx context.Context) (bool, error) {
var result bool
err := r.rpc.CallContext(ctx, &result, "admin_conductorEnabled")
return result, err
}

func (r *RollupClient) SetLogLevel(ctx context.Context, lvl slog.Level) error {
return r.rpc.CallContext(ctx, nil, "admin_setLogLevel", lvl.String())
}
Expand Down

0 comments on commit d83f12d

Please sign in to comment.