Skip to content

Commit

Permalink
op-conductor: Better context management, graceful shutdown (ethereum-…
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianBland authored Jun 17, 2024
1 parent 71b9311 commit 9147c6e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ var _ cliapp.Lifecycle = (*OpConductor)(nil)
func (oc *OpConductor) Start(ctx context.Context) error {
oc.log.Info("starting OpConductor")

if err := oc.hmon.Start(); err != nil {
if err := oc.hmon.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start health monitor")
}

Expand Down
2 changes: 1 addition & 1 deletion op-conductor/conductor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *OpConductorTestSuite) SetupTest() {
s.conductor = conductor

s.healthUpdateCh = make(chan error, 1)
s.hmon.EXPECT().Start().Return(nil)
s.hmon.EXPECT().Start(mock.Anything).Return(nil)
s.conductor.healthUpdateCh = s.healthUpdateCh

s.leaderUpdateCh = make(chan bool, 1)
Expand Down
27 changes: 16 additions & 11 deletions op-conductor/health/mocks/HealthMonitor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 13 additions & 12 deletions op-conductor/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type HealthMonitor interface {
// Subscribe returns a channel that will be notified for every health check.
Subscribe() <-chan error
// Start starts the health check.
Start() error
Start(ctx context.Context) error
// Stop stops the health check.
Stop() error
}
Expand All @@ -39,7 +39,6 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
return &SequencerHealthMonitor{
log: log,
metrics: metrics,
done: make(chan struct{}),
interval: interval,
healthUpdateCh: make(chan error),
rollupCfg: rollupCfg,
Expand All @@ -57,7 +56,7 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
type SequencerHealthMonitor struct {
log log.Logger
metrics metrics.Metricer
done chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup

rollupCfg *rollup.Config
Expand All @@ -79,10 +78,13 @@ type SequencerHealthMonitor struct {
var _ HealthMonitor = (*SequencerHealthMonitor)(nil)

// Start implements HealthMonitor.
func (hm *SequencerHealthMonitor) Start() error {
func (hm *SequencerHealthMonitor) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
hm.cancel = cancel

hm.log.Info("starting health monitor")
hm.wg.Add(1)
go hm.loop()
go hm.loop(ctx)

hm.log.Info("health monitor started")
return nil
Expand All @@ -91,7 +93,7 @@ func (hm *SequencerHealthMonitor) Start() error {
// Stop implements HealthMonitor.
func (hm *SequencerHealthMonitor) Stop() error {
hm.log.Info("stopping health monitor")
close(hm.done)
hm.cancel()
hm.wg.Wait()

hm.log.Info("health monitor stopped")
Expand All @@ -103,7 +105,7 @@ func (hm *SequencerHealthMonitor) Subscribe() <-chan error {
return hm.healthUpdateCh
}

func (hm *SequencerHealthMonitor) loop() {
func (hm *SequencerHealthMonitor) loop(ctx context.Context) {
defer hm.wg.Done()

duration := time.Duration(hm.interval) * time.Second
Expand All @@ -112,16 +114,16 @@ func (hm *SequencerHealthMonitor) loop() {

for {
select {
case <-hm.done:
case <-ctx.Done():
return
case <-ticker.C:
err := hm.healthCheck()
err := hm.healthCheck(ctx)
hm.metrics.RecordHealthCheck(err == nil, err)
// Ensure that we exit cleanly if told to shutdown while still waiting to publish the health update
select {
case hm.healthUpdateCh <- err:
continue
case <-hm.done:
case <-ctx.Done():
return
}
}
Expand All @@ -133,8 +135,7 @@ func (hm *SequencerHealthMonitor) loop() {
// 2. unsafe head is not too far behind now (measured by unsafeInterval)
// 3. safe head is progressing every configured batch submission interval
// 4. peer count is above the configured minimum
func (hm *SequencerHealthMonitor) healthCheck() error {
ctx := context.Background()
func (hm *SequencerHealthMonitor) healthCheck(ctx context.Context) error {
status, err := hm.node.SyncStatus(ctx)
if err != nil {
hm.log.Error("health monitor failed to get sync status", "err", err)
Expand Down
8 changes: 4 additions & 4 deletions op-conductor/health/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/ethereum-optimism/optimism/op-conductor/metrics"
Expand Down Expand Up @@ -53,11 +54,10 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
mockP2P.EXPECT().PeerStats(context.Background()).Return(ps1, nil)
mockP2P.EXPECT().PeerStats(mock.Anything).Return(ps1, nil)
}
monitor := &SequencerHealthMonitor{
log: s.log,
done: make(chan struct{}),
interval: s.interval,
metrics: &metrics.NoopMetricsImpl{},
healthUpdateCh: make(chan error),
Expand All @@ -70,7 +70,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
node: mockRollupClient,
p2p: mockP2P,
}
err := monitor.Start()
err := monitor.Start(context.Background())
s.NoError(err)
return monitor
}
Expand All @@ -88,7 +88,7 @@ func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() {
ps1 := &p2p.PeerStats{
Connected: unhealthyPeerCount,
}
pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(1)
pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil).Times(1)

monitor := s.SetupMonitor(now, 60, 60, rc, pc)

Expand Down

0 comments on commit 9147c6e

Please sign in to comment.