From a86b645cc7c91b547fee8acb5bc42f3b19cac379 Mon Sep 17 00:00:00 2001 From: Clay <167740482+claymega@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:38:51 +0800 Subject: [PATCH] op-node: parallel events executor (#4) * run op-node in parallel-event mode * add metrics and fix data race * delete debug log * delete metrics * recover executor_global_test.go * modify the variable type and function name * modify comment --- op-node/node/node.go | 2 +- op-node/rollup/derive/pipeline.go | 28 ++--- op-node/rollup/driver/state.go | 7 +- op-node/rollup/driver/steps.go | 31 ++++-- op-node/rollup/event/executor_parallel.go | 120 ++++++++++++++++++++++ op-node/rollup/event/system.go | 14 +-- op-node/rollup/sequencing/disabled.go | 2 + op-node/rollup/sequencing/iface.go | 2 + op-node/rollup/sequencing/sequencer.go | 30 ++++++ op-node/rollup/status/status.go | 9 +- op-service/metrics/ref_metrics.go | 9 +- 11 files changed, 220 insertions(+), 34 deletions(-) create mode 100644 op-node/rollup/event/executor_parallel.go diff --git a/op-node/node/node.go b/op-node/node/node.go index 9d9f6a4343ac..6d7cb9968fdd 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -165,7 +165,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error { func (n *OpNode) initEventSystem() { // This executor will be configurable in the future, for parallel event processing - executor := event.NewGlobalSynchronous(n.resourcesCtx) + executor := event.NewParallelExec() sys := event.NewSystem(n.log, executor) sys.AddTracer(event.NewMetricsTracer(n.metrics)) sys.Register("node", event.DeriverFunc(n.onEvent), event.DefaultRegisterOpts()) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index a06640086fde..4124382a9cc0 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -58,7 +59,7 @@ type DerivationPipeline struct { // Index of the stage that is currently being reset. // >= len(stages) if no additional resetting is required - resetting int + resetting atomic.Int32 stages []ResettableStage // Special stages to keep track of @@ -70,7 +71,8 @@ type DerivationPipeline struct { origin eth.L1BlockRef resetL2Safe eth.L2BlockRef resetSysConfig eth.SystemConfig - engineIsReset bool + // Its value is only 1 or 0 + engineIsReset atomic.Bool metrics Metrics } @@ -100,7 +102,6 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L rollupCfg: rollupCfg, l1Fetcher: l1Fetcher, altDA: altDA, - resetting: 0, stages: stages, metrics: metrics, traversal: l1Traversal, @@ -112,14 +113,14 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L // DerivationReady returns true if the derivation pipeline is ready to be used. // When it's being reset its state is inconsistent, and should not be used externally. func (dp *DerivationPipeline) DerivationReady() bool { - return dp.engineIsReset && dp.resetting > 0 + return dp.engineIsReset.Load() && dp.resetting.Load() > 0 } func (dp *DerivationPipeline) Reset() { - dp.resetting = 0 + dp.resetting.Store(0) dp.resetSysConfig = eth.SystemConfig{} dp.resetL2Safe = eth.L2BlockRef{} - dp.engineIsReset = false + dp.engineIsReset.Store(false) } // Origin is the L1 block of the inner-most stage of the derivation pipeline, @@ -145,8 +146,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl }() // if any stages need to be reset, do that first. - if dp.resetting < len(dp.stages) { - if !dp.engineIsReset { + if dp.resetting.Load() < int32(len(dp.stages)) { + if !dp.engineIsReset.Load() { return nil, NewResetError(errors.New("cannot continue derivation until Engine has been reset")) } @@ -159,12 +160,13 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl } } - if err := dp.stages[dp.resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF { - dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.origin) - dp.resetting += 1 + resetting := dp.resetting.Load() + if err := dp.stages[resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF { + dp.log.Debug("reset of stage completed", "stage", resetting, "origin", dp.origin) + dp.resetting.Add(1) return nil, nil } else if err != nil { - return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err) + return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting.Load(), err) } else { return nil, nil } @@ -239,5 +241,5 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth. } func (dp *DerivationPipeline) ConfirmEngineReset() { - dp.engineIsReset = true + dp.engineIsReset.Store(true) } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 2840cedcf423..a958062ef359 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -196,6 +196,8 @@ func (s *Driver) eventLoop() { defer altSyncTicker.Stop() lastUnsafeL2 := s.Engine.UnsafeL2Head() + var currentNextDelayedStep <-chan time.Time = nil + for { if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing. return @@ -224,6 +226,7 @@ func (s *Driver) eventLoop() { select { case <-sequencerCh: s.Emitter.Emit(sequencing.SequencerActionEvent{}) + case <-s.sequencer.CheckNextAction(): case <-altSyncTicker.C: // Check if there is a gap in the current unsafe payload queue. ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*2) @@ -262,7 +265,9 @@ func (s *Driver) eventLoop() { case newL1Finalized := <-s.l1FinalizedSig: s.emitter.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized}) reqStep() // we may be able to mark more L2 data as finalized now - case <-s.sched.NextDelayedStep(): + case req := <-s.sched.UpdateDelayedStepReq: + currentNextDelayedStep = req + case <-currentNextDelayedStep: s.emitter.Emit(StepAttemptEvent{}) case <-s.sched.NextStep(): s.emitter.Emit(StepAttemptEvent{}) diff --git a/op-node/rollup/driver/steps.go b/op-node/rollup/driver/steps.go index 0afe95941249..1d8a9cc92265 100644 --- a/op-node/rollup/driver/steps.go +++ b/op-node/rollup/driver/steps.go @@ -62,7 +62,8 @@ type StepSchedulingDeriver struct { bOffStrategy retry.Strategy // channel, nil by default (not firing), but used to schedule re-attempts with delay - delayedStepReq <-chan time.Time + delayedStepReq <-chan time.Time + UpdateDelayedStepReq chan (<-chan time.Time) // stepReqCh is used to request that the driver attempts to step forward by one L1 block. stepReqCh chan struct{} @@ -74,11 +75,12 @@ type StepSchedulingDeriver struct { func NewStepSchedulingDeriver(log log.Logger) *StepSchedulingDeriver { return &StepSchedulingDeriver{ - stepAttempts: 0, - bOffStrategy: retry.Exponential(), - stepReqCh: make(chan struct{}, 1), - delayedStepReq: nil, - log: log, + stepAttempts: 0, + bOffStrategy: retry.Exponential(), + stepReqCh: make(chan struct{}, 1), + delayedStepReq: nil, + log: log, + UpdateDelayedStepReq: make(chan (<-chan time.Time), 1), } } @@ -102,6 +104,11 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time { func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool { step := func() { s.delayedStepReq = nil + select { + case s.UpdateDelayedStepReq <- s.delayedStepReq: + default: + } + select { case s.stepReqCh <- struct{}{}: // Don't deadlock if the channel is already full @@ -113,6 +120,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool { case StepDelayedReqEvent: if s.delayedStepReq == nil { s.delayedStepReq = time.After(x.Delay) + select { + case s.UpdateDelayedStepReq <- s.delayedStepReq: + default: + } } case StepReqEvent: if x.ResetBackoff { @@ -124,6 +135,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool { delay := s.bOffStrategy.Duration(s.stepAttempts) s.log.Debug("scheduling re-attempt with delay", "attempts", s.stepAttempts, "delay", delay) s.delayedStepReq = time.After(delay) + select { + case s.UpdateDelayedStepReq <- s.delayedStepReq: + default: + } } else { s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", s.stepAttempts) } @@ -133,6 +148,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool { case StepAttemptEvent: // clear the delayed-step channel s.delayedStepReq = nil + select { + case s.UpdateDelayedStepReq <- s.delayedStepReq: + default: + } if s.stepAttempts > 0 { s.log.Debug("Running step retry", "attempts", s.stepAttempts) } diff --git a/op-node/rollup/event/executor_parallel.go b/op-node/rollup/event/executor_parallel.go new file mode 100644 index 000000000000..d3d7e3b83867 --- /dev/null +++ b/op-node/rollup/event/executor_parallel.go @@ -0,0 +1,120 @@ +package event + +import ( + "context" + "slices" + "sync" + "sync/atomic" +) + +type ParallelExec struct { + workers []*worker + mu sync.RWMutex +} + +var _ Executor = (*ParallelExec)(nil) + +func NewParallelExec() *ParallelExec { + return &ParallelExec{} +} + +func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) { + p.mu.Lock() + defer p.mu.Unlock() + w := newWorker(p, d, opts) + p.workers = append(p.workers, w) + return w.leave +} + +func (p *ParallelExec) remove(w *worker) { + p.mu.Lock() + defer p.mu.Unlock() + // Linear search to delete is fine, + // since we delete much less frequently than we process events with these. + for i, v := range p.workers { + if v == w { + p.workers = slices.Delete(p.workers, i, i+1) + return + } + } +} + +func (p *ParallelExec) Enqueue(ev AnnotatedEvent) error { + p.mu.RLock() + defer p.mu.RUnlock() + for _, w := range p.workers { + w.enqueue(ev) // this will block if its capacity is full, providing back-pressure to the Enqueue caller + } + return nil +} + +func (p *ParallelExec) Drain() error { + return nil +} +func (p *ParallelExec) DrainUntil(fn func(ev Event) bool, excl bool) error { + return nil +} + +type worker struct { + // ctx signals when the worker is exiting. + // No additional events will be accepted after cancellation. + ctx context.Context + cancel context.CancelFunc + + // closed as channel is closed upon exit of the run loop + closed chan struct{} + + // ingress is the buffered channel of events to process + ingress chan AnnotatedEvent + + // d is the underlying executable to process events on + d Executable + + // p is a reference to the ParallelExec that owns this worker. + // The worker removes itself from this upon leaving. + p atomic.Pointer[ParallelExec] +} + +func newWorker(p *ParallelExec, d Executable, opts *ExecutorOpts) *worker { + ctx, cancel := context.WithCancel(context.Background()) + w := &worker{ + ctx: ctx, + cancel: cancel, + closed: make(chan struct{}), + ingress: make(chan AnnotatedEvent, opts.Capacity), + d: d, + } + w.p.Store(p) + go w.run() + return w +} + +func (w *worker) enqueue(ev AnnotatedEvent) { + select { + case <-w.ctx.Done(): + case w.ingress <- ev: + } + +} + +func (w *worker) leave() { + w.cancel() + if old := w.p.Swap(nil); old != nil { + // remove from worker pool + old.remove(w) + } + // wait for run loop to exit + <-w.closed +} + +func (w *worker) run() { + for { + select { + case <-w.ctx.Done(): + close(w.closed) + return + case ev := <-w.ingress: + w.d.RunEvent(ev) + } + } +} diff --git a/op-node/rollup/event/system.go b/op-node/rollup/event/system.go index 566f28fdbe40..7b67258fd2b0 100644 --- a/op-node/rollup/event/system.go +++ b/op-node/rollup/event/system.go @@ -57,7 +57,7 @@ type systemActor struct { leaveExecutor func() // 0 if event does not originate from Deriver-handling of another event - currentEvent uint64 + currentEvent atomic.Uint64 } // Emit is called by the end-user @@ -65,7 +65,7 @@ func (r *systemActor) Emit(ev Event) { if r.ctx.Err() != nil { return } - r.sys.emit(r.name, r.currentEvent, ev) + r.sys.emit(r.name, r.currentEvent.Load(), ev) } // RunEvent is called by the events executor. @@ -82,13 +82,13 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) { return } - prev := r.currentEvent + prev := r.currentEvent.Load() start := time.Now() - r.currentEvent = r.sys.recordDerivStart(r.name, ev, start) + r.currentEvent.Store(r.sys.recordDerivStart(r.name, ev, start)) effect := r.deriv.OnEvent(ev.Event) elapsed := time.Since(start) - r.sys.recordDerivEnd(r.name, ev, r.currentEvent, start, elapsed, effect) - r.currentEvent = prev + r.sys.recordDerivEnd(r.name, ev, r.currentEvent.Load(), start, elapsed, effect) + r.currentEvent.Store(prev) } // Sys is the canonical implementation of System. @@ -141,7 +141,7 @@ func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter if opts.Emitter.Limiting { limitedCallback := opts.Emitter.OnLimited em = NewLimiter(ctx, r, opts.Emitter.Rate, opts.Emitter.Burst, func() { - r.sys.recordRateLimited(name, r.currentEvent) + r.sys.recordRateLimited(name, r.currentEvent.Load()) if limitedCallback != nil { limitedCallback() } diff --git a/op-node/rollup/sequencing/disabled.go b/op-node/rollup/sequencing/disabled.go index 3634284ccd2f..3ef6d7b2ca73 100644 --- a/op-node/rollup/sequencing/disabled.go +++ b/op-node/rollup/sequencing/disabled.go @@ -24,6 +24,8 @@ func (ds DisabledSequencer) NextAction() (t time.Time, ok bool) { return time.Time{}, false } +func (ds DisabledSequencer) CheckNextAction() <-chan struct{} { return nil } + func (ds DisabledSequencer) Active() bool { return false } diff --git a/op-node/rollup/sequencing/iface.go b/op-node/rollup/sequencing/iface.go index 54e0c70719e0..533b828b0d5b 100644 --- a/op-node/rollup/sequencing/iface.go +++ b/op-node/rollup/sequencing/iface.go @@ -13,6 +13,8 @@ type SequencerIface interface { event.Deriver // NextAction returns when the sequencer needs to do the next change, and iff it should do so. NextAction() (t time.Time, ok bool) + // CheckNextAction is channel to await changes in the value of `nextActionOK` + CheckNextAction() <-chan struct{} Active() bool Init(ctx context.Context, active bool) error Start(ctx context.Context, head common.Hash) error diff --git a/op-node/rollup/sequencing/sequencer.go b/op-node/rollup/sequencing/sequencer.go index 27137f6300c9..80121784676a 100644 --- a/op-node/rollup/sequencing/sequencer.go +++ b/op-node/rollup/sequencing/sequencer.go @@ -110,6 +110,7 @@ type Sequencer struct { // nextAction is when the next sequencing action should be performed nextAction time.Time nextActionOK bool + nextActionCh chan struct{} latest BuildingState latestSealed eth.L2BlockRef @@ -143,6 +144,7 @@ func NewSequencer(driverCtx context.Context, log log.Logger, rollupCfg *rollup.C metrics: metrics, timeNow: time.Now, toBlockRef: derive.PayloadToBlockRef, + nextActionCh: make(chan struct{}, 1), } } @@ -232,6 +234,14 @@ func (d *Sequencer) onBuildStarted(x engine.BuildStartedEvent) { // finish with margin of sealing duration before payloadTime d.nextAction = payloadTime.Add(-sealingDuration) } + select { + case d.nextActionCh <- struct{}{}: + default: + } +} + +func (d *Sequencer) CheckNextAction() <-chan struct{} { + return d.nextActionCh } func (d *Sequencer) handleInvalid() { @@ -242,6 +252,10 @@ func (d *Sequencer) handleInvalid() { blockTime := time.Duration(d.rollupCfg.BlockTime) * time.Second d.nextAction = d.timeNow().Add(blockTime) d.nextActionOK = d.active.Load() + select { + case d.nextActionCh <- struct{}{}: + default: + } } func (d *Sequencer) onInvalidPayloadAttributes(x engine.InvalidPayloadAttributesEvent) { @@ -395,6 +409,10 @@ func (d *Sequencer) onEngineTemporaryError(x rollup.EngineTemporaryErrorEvent) { d.nextAction = d.timeNow().Add(time.Second) } d.nextActionOK = d.active.Load() + select { + case d.nextActionCh <- struct{}{}: + default: + } // We don't explicitly cancel block building jobs upon temporary errors: we may still finish the block (if any). // Any unfinished block building work eventually times out, and will be cleaned up that way. // Note that this only applies to temporary errors upon starting a block-building job. @@ -425,6 +443,10 @@ func (d *Sequencer) onEngineResetConfirmedEvent(x engine.EngineResetConfirmedEve // assuming the execution-engine just churned through some work for the reset. // This will also prevent any potential reset-loop from running too hot. d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) + select { + case d.nextActionCh <- struct{}{}: + default: + } d.log.Info("Engine reset confirmed, sequencer may continue", "next", d.nextActionOK) } @@ -462,6 +484,10 @@ func (d *Sequencer) onForkchoiceUpdate(x engine.ForkchoiceUpdateEvent) { // otherwise start instantly d.nextAction = now } + select { + case d.nextActionCh <- struct{}{}: + default: + } } d.setLatestHead(x.UnsafeL2Head) } @@ -656,6 +682,10 @@ func (d *Sequencer) forceStart() error { d.latest = BuildingState{} d.nextActionOK = true d.nextAction = d.timeNow() + select { + case d.nextActionCh <- struct{}{}: + default: + } d.active.Store(true) d.log.Info("Sequencer has been started", "next action", d.nextAction) return nil diff --git a/op-node/rollup/status/status.go b/op-node/rollup/status/status.go index 65121b1294aa..ef6254c0876e 100644 --- a/op-node/rollup/status/status.go +++ b/op-node/rollup/status/status.go @@ -1,11 +1,12 @@ package status import ( + "context" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "sync" "sync/atomic" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/engine" @@ -35,6 +36,10 @@ type Metrics interface { RecordL1Ref(name string, ref eth.L1BlockRef) } +type L2 interface { + PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error) +} + type StatusTracker struct { data eth.SyncStatus diff --git a/op-service/metrics/ref_metrics.go b/op-service/metrics/ref_metrics.go index 19d8badb9ab7..f91e818be537 100644 --- a/op-service/metrics/ref_metrics.go +++ b/op-service/metrics/ref_metrics.go @@ -2,6 +2,7 @@ package metrics import ( "encoding/binary" + "sync" "time" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -28,7 +29,7 @@ type RefMetrics struct { RefsLatency *prometheus.GaugeVec // hash of the last seen block per name, so we don't reduce/increase latency on updates of the same data, // and only count the first occurrence - LatencySeen map[string]common.Hash + LatencySeen sync.Map } var _ RefMetricer = (*RefMetrics)(nil) @@ -80,7 +81,6 @@ func MakeRefMetrics(ns string, factory Factory) RefMetrics { "layer", "type", }), - LatencySeen: make(map[string]common.Hash), } } @@ -89,8 +89,9 @@ func (m *RefMetrics) RecordRef(layer string, name string, num uint64, timestamp if timestamp != 0 { m.RefsTime.WithLabelValues(layer, name).Set(float64(timestamp)) // only meter the latency when we first see this hash for the given label name - if m.LatencySeen[name] != h { - m.LatencySeen[name] = h + value, ok := m.LatencySeen.Load(name) + if ok && value != h { + m.LatencySeen.Store(name, h) m.RefsLatency.WithLabelValues(layer, name).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9)) } }