Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: parallel events executor #4

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -58,7 +59,7 @@ type DerivationPipeline struct {

// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
resetting atomic.Int32
stages []ResettableStage

// Special stages to keep track of
Expand All @@ -70,7 +71,8 @@ type DerivationPipeline struct {
origin eth.L1BlockRef
resetL2Safe eth.L2BlockRef
resetSysConfig eth.SystemConfig
Comment on lines 71 to 73
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add mutex for this fields as well? Will they be modified concurrently by multiple goroutines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields are internal variables and will only be modified by the Step(), but this function will only be called in the pipeline goroutine and not be called by other handler goroutine.

Copy link
Collaborator

@Troublor Troublor Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claymega I see Reset() function also modify this two variables and Reset() function may also be called by the main event loop. Can you confirm this will not cause a race?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claymega I see Reset() function also modify this two variables and Reset() function may also be called by the main event loop. Can you confirm this will not cause a race?

Right, Reset() also modifies these two variables, and I re-check it, Reset() only be called in the pipeline goroutine

engineIsReset bool
// Its value is only 1 or 0
engineIsReset atomic.Bool

metrics Metrics
}
Expand Down Expand Up @@ -100,7 +102,6 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher,
altDA: altDA,
resetting: 0,
stages: stages,
metrics: metrics,
traversal: l1Traversal,
Expand All @@ -112,14 +113,14 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
// DerivationReady returns true if the derivation pipeline is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) DerivationReady() bool {
return dp.engineIsReset && dp.resetting > 0
return dp.engineIsReset.Load() && dp.resetting.Load() > 0
}

func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.resetting.Store(0)
dp.resetSysConfig = eth.SystemConfig{}
dp.resetL2Safe = eth.L2BlockRef{}
dp.engineIsReset = false
dp.engineIsReset.Store(false)
}

// Origin is the L1 block of the inner-most stage of the derivation pipeline,
Expand All @@ -145,8 +146,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}()

// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if !dp.engineIsReset {
if dp.resetting.Load() < int32(len(dp.stages)) {
if !dp.engineIsReset.Load() {
return nil, NewResetError(errors.New("cannot continue derivation until Engine has been reset"))
}

Expand All @@ -159,12 +160,13 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}
}

if err := dp.stages[dp.resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.origin)
dp.resetting += 1
resetting := dp.resetting.Load()
if err := dp.stages[resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", resetting, "origin", dp.origin)
dp.resetting.Add(1)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err)
return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting.Load(), err)
} else {
return nil, nil
}
Expand Down Expand Up @@ -239,5 +241,5 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
}

func (dp *DerivationPipeline) ConfirmEngineReset() {
dp.engineIsReset = true
dp.engineIsReset.Store(true)
}
5 changes: 2 additions & 3 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ func NewDriver(
var drain func() error
// This instantiation will be one of more options: soon there will be a parallel events executor
{
s := event.NewGlobalSynchronous(driverCtx)
executor = s
drain = s.Drain
executor = event.NewParallelExec()
drain = func() error { return nil } // no-op
}
sys := event.NewSystem(log, executor)
sys.AddTracer(event.NewMetricsTracer(metrics))
Expand Down
7 changes: 6 additions & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ func (s *Driver) eventLoop() {
defer altSyncTicker.Stop()
lastUnsafeL2 := s.Engine.UnsafeL2Head()

var currentNextDelayedStep <-chan time.Time = nil

for {
if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing.
return
Expand Down Expand Up @@ -227,6 +229,7 @@ func (s *Driver) eventLoop() {
select {
case <-sequencerCh:
s.Emitter.Emit(sequencing.SequencerActionEvent{})
case <-s.sequencer.CheckNextAction():
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*2)
Expand Down Expand Up @@ -265,7 +268,9 @@ func (s *Driver) eventLoop() {
case newL1Finalized := <-s.l1FinalizedSig:
s.emitter.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
reqStep() // we may be able to mark more L2 data as finalized now
case <-s.sched.NextDelayedStep():
case req := <-s.sched.UpdateDelayedStepReq:
currentNextDelayedStep = req
case <-currentNextDelayedStep:
s.emitter.Emit(StepAttemptEvent{})
case <-s.sched.NextStep():
s.emitter.Emit(StepAttemptEvent{})
Expand Down
31 changes: 25 additions & 6 deletions op-node/rollup/driver/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type StepSchedulingDeriver struct {
bOffStrategy retry.Strategy

// channel, nil by default (not firing), but used to schedule re-attempts with delay
delayedStepReq <-chan time.Time
delayedStepReq <-chan time.Time
UpdateDelayedStepReq chan (<-chan time.Time)
Troublor marked this conversation as resolved.
Show resolved Hide resolved

// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh chan struct{}
Expand All @@ -74,11 +75,12 @@ type StepSchedulingDeriver struct {

func NewStepSchedulingDeriver(log log.Logger) *StepSchedulingDeriver {
return &StepSchedulingDeriver{
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
UpdateDelayedStepReq: make(chan (<-chan time.Time), 1),
}
}

Expand All @@ -102,6 +104,11 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
step := func() {
s.delayedStepReq = nil
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}

select {
case s.stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
Expand All @@ -113,6 +120,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
case StepDelayedReqEvent:
if s.delayedStepReq == nil {
s.delayedStepReq = time.After(x.Delay)
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
}
case StepReqEvent:
if x.ResetBackoff {
Expand All @@ -124,6 +135,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
delay := s.bOffStrategy.Duration(s.stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", s.stepAttempts, "delay", delay)
s.delayedStepReq = time.After(delay)
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", s.stepAttempts)
}
Expand All @@ -133,6 +148,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
case StepAttemptEvent:
// clear the delayed-step channel
s.delayedStepReq = nil
select {
case s.UpdateDelayedStepReq <- s.delayedStepReq:
default:
}
if s.stepAttempts > 0 {
s.log.Debug("Running step retry", "attempts", s.stepAttempts)
}
Expand Down
113 changes: 113 additions & 0 deletions op-node/rollup/event/executor_parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package event

import (
"context"
"slices"
"sync"
"sync/atomic"
)

type ParallelExec struct {
workers []*worker
mu sync.RWMutex
}

var _ Executor = (*ParallelExec)(nil)

func NewParallelExec() *ParallelExec {
return &ParallelExec{}
}

func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) {
p.mu.Lock()
defer p.mu.Unlock()
w := newWorker(p, d, opts)
p.workers = append(p.workers, w)
return w.leave
}

func (p *ParallelExec) remove(w *worker) {
p.mu.Lock()
defer p.mu.Unlock()
// Linear search to delete is fine,
// since we delete much less frequently than we process events with these.
for i, v := range p.workers {
if v == w {
p.workers = slices.Delete(p.workers, i, i+1)
return
}
}
}

func (p *ParallelExec) Enqueue(ev AnnotatedEvent) error {
p.mu.RLock()
defer p.mu.RUnlock()
for _, w := range p.workers {
w.enqueue(ev) // this will block if its capacity is full, providing back-pressure to the Enqueue caller
}
return nil
}

type worker struct {
// ctx signals when the worker is exiting.
// No additional events will be accepted after cancellation.
ctx context.Context
cancel context.CancelFunc

// closed as channel is closed upon exit of the run loop
closed chan struct{}

// ingress is the buffered channel of events to process
ingress chan AnnotatedEvent

// d is the underlying executable to process events on
d Executable

// p is a reference to the ParallelExec that owns this worker.
// The worker removes itself from this upon leaving.
p atomic.Pointer[ParallelExec]
}

func newWorker(p *ParallelExec, d Executable, opts *ExecutorOpts) *worker {
ctx, cancel := context.WithCancel(context.Background())
w := &worker{
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
ingress: make(chan AnnotatedEvent, opts.Capacity),
d: d,
}
w.p.Store(p)
go w.run()
return w
}

func (w *worker) enqueue(ev AnnotatedEvent) {
select {
case <-w.ctx.Done():
case w.ingress <- ev:
}

}

func (w *worker) leave() {
w.cancel()
if old := w.p.Swap(nil); old != nil {
// remove from worker pool
old.remove(w)
}
// wait for run loop to exit
<-w.closed
}

func (w *worker) run() {
for {
select {
case <-w.ctx.Done():
close(w.closed)
return
case ev := <-w.ingress:
w.d.RunEvent(ev)
}
}
}
14 changes: 7 additions & 7 deletions op-node/rollup/event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ type systemActor struct {
leaveExecutor func()

// 0 if event does not originate from Deriver-handling of another event
currentEvent uint64
currentEvent atomic.Uint64
}

// Emit is called by the end-user
func (r *systemActor) Emit(ev Event) {
if r.ctx.Err() != nil {
return
}
r.sys.emit(r.name, r.currentEvent, ev)
r.sys.emit(r.name, r.currentEvent.Load(), ev)
}

// RunEvent is called by the events executor.
Expand All @@ -74,13 +74,13 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) {
return
}

prev := r.currentEvent
prev := r.currentEvent.Load()
start := time.Now()
r.currentEvent = r.sys.recordDerivStart(r.name, ev, start)
r.currentEvent.Store(r.sys.recordDerivStart(r.name, ev, start))
effect := r.deriv.OnEvent(ev.Event)
elapsed := time.Since(start)
r.sys.recordDerivEnd(r.name, ev, r.currentEvent, start, elapsed, effect)
r.currentEvent = prev
r.sys.recordDerivEnd(r.name, ev, r.currentEvent.Load(), start, elapsed, effect)
r.currentEvent.Store(prev)
}

// Sys is the canonical implementation of System.
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter
if opts.Emitter.Limiting {
limitedCallback := opts.Emitter.OnLimited
em = NewLimiter(ctx, r, opts.Emitter.Rate, opts.Emitter.Burst, func() {
r.sys.recordRateLimited(name, r.currentEvent)
r.sys.recordRateLimited(name, r.currentEvent.Load())
if limitedCallback != nil {
limitedCallback()
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/sequencing/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (ds DisabledSequencer) NextAction() (t time.Time, ok bool) {
return time.Time{}, false
}

func (ds DisabledSequencer) CheckNextAction() <-chan struct{} { return nil }

func (ds DisabledSequencer) Active() bool {
return false
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/sequencing/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type SequencerIface interface {
event.Deriver
// NextAction returns when the sequencer needs to do the next change, and iff it should do so.
NextAction() (t time.Time, ok bool)
// CheckNextAction is channel to await changes in the value of `nextActionOK`
CheckNextAction() <-chan struct{}
Active() bool
Init(ctx context.Context, active bool) error
Start(ctx context.Context, head common.Hash) error
Expand Down
Loading