Skip to content

Commit

Permalink
op-node: parallel events executor (#4)
Browse files Browse the repository at this point in the history
* run op-node in parallel-event mode

* add metrics and fix data race

* delete debug log

* delete metrics

* recover executor_global_test.go

* modify the variable type and function name

* modify comment
  • Loading branch information
claymega committed Dec 17, 2024
1 parent e8cf586 commit a86b645
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 34 deletions.
2 changes: 1 addition & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {

func (n *OpNode) initEventSystem() {
// This executor will be configurable in the future, for parallel event processing
executor := event.NewGlobalSynchronous(n.resourcesCtx)
executor := event.NewParallelExec()
sys := event.NewSystem(n.log, executor)
sys.AddTracer(event.NewMetricsTracer(n.metrics))
sys.Register("node", event.DeriverFunc(n.onEvent), event.DefaultRegisterOpts())
Expand Down
28 changes: 15 additions & 13 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -58,7 +59,7 @@ type DerivationPipeline struct {

// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
resetting atomic.Int32
stages []ResettableStage

// Special stages to keep track of
Expand All @@ -70,7 +71,8 @@ type DerivationPipeline struct {
origin eth.L1BlockRef
resetL2Safe eth.L2BlockRef
resetSysConfig eth.SystemConfig
engineIsReset bool
// Its value is only 1 or 0
engineIsReset atomic.Bool

metrics Metrics
}
Expand Down Expand Up @@ -100,7 +102,6 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher,
altDA: altDA,
resetting: 0,
stages: stages,
metrics: metrics,
traversal: l1Traversal,
Expand All @@ -112,14 +113,14 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
// DerivationReady returns true if the derivation pipeline is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) DerivationReady() bool {
return dp.engineIsReset && dp.resetting > 0
return dp.engineIsReset.Load() && dp.resetting.Load() > 0
}

func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.resetting.Store(0)
dp.resetSysConfig = eth.SystemConfig{}
dp.resetL2Safe = eth.L2BlockRef{}
dp.engineIsReset = false
dp.engineIsReset.Store(false)
}

// Origin is the L1 block of the inner-most stage of the derivation pipeline,
Expand All @@ -145,8 +146,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}()

// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if !dp.engineIsReset {
if dp.resetting.Load() < int32(len(dp.stages)) {
if !dp.engineIsReset.Load() {
return nil, NewResetError(errors.New("cannot continue derivation until Engine has been reset"))
}

Expand All @@ -159,12 +160,13 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}
}

if err := dp.stages[dp.resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.origin)
dp.resetting += 1
resetting := dp.resetting.Load()
if err := dp.stages[resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", resetting, "origin", dp.origin)
dp.resetting.Add(1)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err)
return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting.Load(), err)
} else {
return nil, nil
}
Expand Down Expand Up @@ -239,5 +241,5 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
}

func (dp *DerivationPipeline) ConfirmEngineReset() {
dp.engineIsReset = true
dp.engineIsReset.Store(true)
}
7 changes: 6 additions & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (s *Driver) eventLoop() {
defer altSyncTicker.Stop()
lastUnsafeL2 := s.Engine.UnsafeL2Head()

var currentNextDelayedStep <-chan time.Time = nil

for {
if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing.
return
Expand Down Expand Up @@ -224,6 +226,7 @@ func (s *Driver) eventLoop() {
select {
case <-sequencerCh:
s.Emitter.Emit(sequencing.SequencerActionEvent{})
case <-s.sequencer.CheckNextAction():
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*2)
Expand Down Expand Up @@ -262,7 +265,9 @@ func (s *Driver) eventLoop() {
case newL1Finalized := <-s.l1FinalizedSig:
s.emitter.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
reqStep() // we may be able to mark more L2 data as finalized now
case <-s.sched.NextDelayedStep():
case req := <-s.sched.UpdateDelayedStepReq:
currentNextDelayedStep = req
case <-currentNextDelayedStep:
s.emitter.Emit(StepAttemptEvent{})
case <-s.sched.NextStep():
s.emitter.Emit(StepAttemptEvent{})
Expand Down
31 changes: 25 additions & 6 deletions op-node/rollup/driver/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type StepSchedulingDeriver struct {
bOffStrategy retry.Strategy

// channel, nil by default (not firing), but used to schedule re-attempts with delay
delayedStepReq <-chan time.Time
delayedStepReq <-chan time.Time
UpdateDelayedStepReq chan (<-chan time.Time)

// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh chan struct{}
Expand All @@ -74,11 +75,12 @@ type StepSchedulingDeriver struct {

func NewStepSchedulingDeriver(log log.Logger) *StepSchedulingDeriver {
return &StepSchedulingDeriver{
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
UpdateDelayedStepReq: make(chan (<-chan time.Time), 1),
}
}

Expand All @@ -102,6 +104,11 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
step := func() {
s.delayedStepReq = nil
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}

select {
case s.stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
Expand All @@ -113,6 +120,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
case StepDelayedReqEvent:
if s.delayedStepReq == nil {
s.delayedStepReq = time.After(x.Delay)
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
}
case StepReqEvent:
if x.ResetBackoff {
Expand All @@ -124,6 +135,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
delay := s.bOffStrategy.Duration(s.stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", s.stepAttempts, "delay", delay)
s.delayedStepReq = time.After(delay)
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", s.stepAttempts)
}
Expand All @@ -133,6 +148,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
case StepAttemptEvent:
// clear the delayed-step channel
s.delayedStepReq = nil
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
if s.stepAttempts > 0 {
s.log.Debug("Running step retry", "attempts", s.stepAttempts)
}
Expand Down
120 changes: 120 additions & 0 deletions op-node/rollup/event/executor_parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package event

import (
"context"
"slices"
"sync"
"sync/atomic"
)

type ParallelExec struct {
workers []*worker
mu sync.RWMutex
}

var _ Executor = (*ParallelExec)(nil)

func NewParallelExec() *ParallelExec {
return &ParallelExec{}
}

func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) {
p.mu.Lock()
defer p.mu.Unlock()
w := newWorker(p, d, opts)
p.workers = append(p.workers, w)
return w.leave
}

func (p *ParallelExec) remove(w *worker) {
p.mu.Lock()
defer p.mu.Unlock()
// Linear search to delete is fine,
// since we delete much less frequently than we process events with these.
for i, v := range p.workers {
if v == w {
p.workers = slices.Delete(p.workers, i, i+1)
return
}
}
}

func (p *ParallelExec) Enqueue(ev AnnotatedEvent) error {
p.mu.RLock()
defer p.mu.RUnlock()
for _, w := range p.workers {
w.enqueue(ev) // this will block if its capacity is full, providing back-pressure to the Enqueue caller
}
return nil
}

func (p *ParallelExec) Drain() error {
return nil
}
func (p *ParallelExec) DrainUntil(fn func(ev Event) bool, excl bool) error {
return nil
}

type worker struct {
// ctx signals when the worker is exiting.
// No additional events will be accepted after cancellation.
ctx context.Context
cancel context.CancelFunc

// closed as channel is closed upon exit of the run loop
closed chan struct{}

// ingress is the buffered channel of events to process
ingress chan AnnotatedEvent

// d is the underlying executable to process events on
d Executable

// p is a reference to the ParallelExec that owns this worker.
// The worker removes itself from this upon leaving.
p atomic.Pointer[ParallelExec]
}

func newWorker(p *ParallelExec, d Executable, opts *ExecutorOpts) *worker {
ctx, cancel := context.WithCancel(context.Background())
w := &worker{
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
ingress: make(chan AnnotatedEvent, opts.Capacity),
d: d,
}
w.p.Store(p)
go w.run()
return w
}

func (w *worker) enqueue(ev AnnotatedEvent) {
select {
case <-w.ctx.Done():
case w.ingress <- ev:
}

}

func (w *worker) leave() {
w.cancel()
if old := w.p.Swap(nil); old != nil {
// remove from worker pool
old.remove(w)
}
// wait for run loop to exit
<-w.closed
}

func (w *worker) run() {
for {
select {
case <-w.ctx.Done():
close(w.closed)
return
case ev := <-w.ingress:
w.d.RunEvent(ev)
}
}
}
14 changes: 7 additions & 7 deletions op-node/rollup/event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ type systemActor struct {
leaveExecutor func()

// 0 if event does not originate from Deriver-handling of another event
currentEvent uint64
currentEvent atomic.Uint64
}

// Emit is called by the end-user
func (r *systemActor) Emit(ev Event) {
if r.ctx.Err() != nil {
return
}
r.sys.emit(r.name, r.currentEvent, ev)
r.sys.emit(r.name, r.currentEvent.Load(), ev)
}

// RunEvent is called by the events executor.
Expand All @@ -82,13 +82,13 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) {
return
}

prev := r.currentEvent
prev := r.currentEvent.Load()
start := time.Now()
r.currentEvent = r.sys.recordDerivStart(r.name, ev, start)
r.currentEvent.Store(r.sys.recordDerivStart(r.name, ev, start))
effect := r.deriv.OnEvent(ev.Event)
elapsed := time.Since(start)
r.sys.recordDerivEnd(r.name, ev, r.currentEvent, start, elapsed, effect)
r.currentEvent = prev
r.sys.recordDerivEnd(r.name, ev, r.currentEvent.Load(), start, elapsed, effect)
r.currentEvent.Store(prev)
}

// Sys is the canonical implementation of System.
Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter
if opts.Emitter.Limiting {
limitedCallback := opts.Emitter.OnLimited
em = NewLimiter(ctx, r, opts.Emitter.Rate, opts.Emitter.Burst, func() {
r.sys.recordRateLimited(name, r.currentEvent)
r.sys.recordRateLimited(name, r.currentEvent.Load())
if limitedCallback != nil {
limitedCallback()
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/sequencing/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (ds DisabledSequencer) NextAction() (t time.Time, ok bool) {
return time.Time{}, false
}

func (ds DisabledSequencer) CheckNextAction() <-chan struct{} { return nil }

func (ds DisabledSequencer) Active() bool {
return false
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/sequencing/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type SequencerIface interface {
event.Deriver
// NextAction returns when the sequencer needs to do the next change, and iff it should do so.
NextAction() (t time.Time, ok bool)
// CheckNextAction is channel to await changes in the value of `nextActionOK`
CheckNextAction() <-chan struct{}
Active() bool
Init(ctx context.Context, active bool) error
Start(ctx context.Context, head common.Hash) error
Expand Down
Loading

0 comments on commit a86b645

Please sign in to comment.