Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(op-node): Execute only part of the initialReset function each time #13

Open
wants to merge 2 commits into
base: testnet
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ var (
}(),
Category: RollupCategory,
}
PipelineInitialResetTimeout = &cli.Uint64Flag{
Name: "l1.pipeline-initial-reset-timeout",
Usage: "Number of L1 blocks, control pipeline initial reset.",
EnvVars: prefixEnvVars("PIPELINE_INITIAL_RESET_TIMEOUT"),
Value: 10,
Category: L1RPCCategory,
}
VerifierL1Confs = &cli.Uint64Flag{
Name: "verifier.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.",
Expand Down Expand Up @@ -419,6 +426,7 @@ var optionalFlags = []cli.Flag{
ConductorRpcTimeoutFlag,
SafeDBPath,
L2EngineKind,
PipelineInitialResetTimeout,
}

var DeprecatedFlags = []cli.Flag{
Expand Down
2 changes: 2 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Config struct {

// AltDA config
AltDA altda.CLIConfig

PipelineInitialResetTimeout uint64
}

type RPCConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source,
n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA)
n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, cfg.PipelineInitialResetTimeout)
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions op-node/rollup/derive/deriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
Expand Down Expand Up @@ -116,6 +117,8 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool {
} else if err != nil && errors.Is(err, NotEnoughData) {
// don't do a backoff for this error
d.emitter.Emit(DeriverMoreEvent{})
} else if err != nil && errors.Is(err, InitialResetting) {
d.pipeline.pipelineInitialResetNextTime = time.Now().Add(d.pipeline.pipelineInitialResetDuration)
} else if err != nil {
d.pipeline.log.Error("Derivation process error", "err", err)
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
Expand All @@ -132,6 +135,10 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool {
case ConfirmReceivedAttributesEvent:
d.needAttributesConfirmation = false
default:
if d.pipeline.pipeLineIsInitialReset &&
d.pipeline.pipelineInitialResetNextTime.Before(time.Now()) {
d.emitter.Emit(PipelineStepEvent{PendingSafe: d.pipeline.pipelineInitialResetNextPendingSafe})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • else ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If true, continue to execute the previous initialReset logic, if false, then no logic is executed

return false
}
return true
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/derive/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ var (
// EngineELSyncing implies that the execution engine is currently in progress of syncing.
EngineELSyncing = errors.New("engine is performing EL sync")

InitialResetting = errors.New("pipeline is performing reset")

// Sentinel errors, use these to get the severity of errors by calling
// errors.Is(err, ErrTemporary) for example.
ErrTemporary = NewTemporaryError(nil)
Expand Down
69 changes: 51 additions & 18 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"io"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -71,15 +71,23 @@ type DerivationPipeline struct {
origin eth.L1BlockRef
resetL2Safe eth.L2BlockRef
resetSysConfig eth.SystemConfig
// Its value is only 1 or 0

// InitialReset is resetting
pipeLineIsInitialReset bool
pipelineInitialResetTimeout uint64
pipelineInitialResetL2 eth.L2BlockRef
pipelineInitialResetNextTime time.Time
pipelineInitialResetDuration time.Duration
pipelineInitialResetNextPendingSafe eth.L2BlockRef

engineIsReset atomic.Bool

metrics Metrics
}

// NewDerivationPipeline creates a DerivationPipeline, to turn L1 data into L2 block-inputs.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher,
altDA AltDAInputFetcher, l2Source L2Source, metrics Metrics) *DerivationPipeline {
altDA AltDAInputFetcher, l2Source L2Source, metrics Metrics, pipelineInitialResetTimeout uint64) *DerivationPipeline {

// Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
Expand All @@ -98,15 +106,17 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
stages := []ResettableStage{l1Traversal, l1Src, altDA, frameQueue, bank, chInReader, batchQueue, attributesQueue}

return &DerivationPipeline{
log: log,
rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher,
altDA: altDA,
stages: stages,
metrics: metrics,
traversal: l1Traversal,
attrib: attributesQueue,
l2: l2Source,
log: log,
rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher,
altDA: altDA,
stages: stages,
metrics: metrics,
traversal: l1Traversal,
attrib: attributesQueue,
l2: l2Source,
pipelineInitialResetTimeout: pipelineInitialResetTimeout,
pipelineInitialResetDuration: 5 * time.Second,
}
}

Expand Down Expand Up @@ -156,8 +166,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
// so we can read all the L2 data necessary for constructing the next batches that come after the safe head.
if pendingSafeHead != dp.resetL2Safe {
if err := dp.initialReset(ctx, pendingSafeHead); err != nil {
if errors.Is(err, InitialResetting) {
dp.pipelineInitialResetNextPendingSafe = pendingSafeHead
return nil, err
}
return nil, fmt.Errorf("failed initial reset work: %w", err)
}
} else {
dp.pipeLineIsInitialReset = false
}

resetting := dp.resetting.Load()
Expand Down Expand Up @@ -204,17 +220,23 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
// Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from.
pipelineL2 := resetL2Safe
l1Origin := resetL2Safe.L1Origin
if dp.pipeLineIsInitialReset {
pipelineL2 = dp.pipelineInitialResetL2
l1Origin = dp.pipelineInitialResetL2.L1Origin
}

pipelineOrigin, err := dp.l1Fetcher.L1BlockRefByHash(ctx, l1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err))
}

var afterChannelTimeout bool
for {
afterL2Genesis := pipelineL2.Number > dp.rollupCfg.Genesis.L2.Number
afterL1Genesis := pipelineL2.L1Origin.Number > dp.rollupCfg.Genesis.L1.Number
afterChannelTimeout := pipelineL2.L1Origin.Number+spec.ChannelTimeout(pipelineOrigin.Time) > l1Origin.Number
if afterL2Genesis && afterL1Genesis && afterChannelTimeout {
afterPipelineInitialResetTimeout := pipelineL2.L1Origin.Number+dp.pipelineInitialResetTimeout > l1Origin.Number
afterChannelTimeout = pipelineL2.L1Origin.Number+spec.ChannelTimeout(pipelineOrigin.Time) > l1Origin.Number
if afterL2Genesis && afterL1Genesis && afterPipelineInitialResetTimeout && afterChannelTimeout {
parent, err := dp.l2.L2BlockRefByHash(ctx, pipelineL2.ParentHash)
if err != nil {
return NewResetError(fmt.Errorf("failed to fetch L2 parent block %s", pipelineL2.ParentID()))
Expand All @@ -225,6 +247,12 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err))
}
} else {
if afterChannelTimeout {
dp.pipelineInitialResetL2 = pipelineL2
dp.pipeLineIsInitialReset = true
} else {
dp.pipeLineIsInitialReset = false
}
break
}
}
Expand All @@ -236,8 +264,13 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.

dp.origin = pipelineOrigin
dp.resetSysConfig = sysCfg
dp.resetL2Safe = resetL2Safe
return nil
dp.resetL2Safe = pipelineL2

if afterChannelTimeout {
return InitialResetting
} else {
return nil
}
}

func (dp *DerivationPipeline) ConfirmEngineReset() {
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func NewDriver(
syncCfg *sync.Config,
sequencerConductor conductor.SequencerConductor,
altDA AltDAIface,
pipelineInitialResetTimeout uint64,
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -215,7 +216,7 @@ func NewDriver(
sys.Register("attributes-handler",
attributes.NewAttributesHandler(log, cfg, driverCtx, l2), opts)

derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, altDA, l2, metrics)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, altDA, l2, metrics, pipelineInitialResetTimeout)

sys.Register("pipeline",
derive.NewPipelineDeriver(driverCtx, derivationPipeline), opts)
Expand Down
5 changes: 4 additions & 1 deletion op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
haltOption = ""
}

pipelineInitialResetTimeout := ctx.Uint64(flags.PipelineInitialResetTimeout.Name)

if ctx.IsSet(flags.HeartbeatEnabledFlag.Name) ||
ctx.IsSet(flags.HeartbeatMonikerFlag.Name) ||
ctx.IsSet(flags.HeartbeatURLFlag.Name) {
Expand Down Expand Up @@ -112,7 +114,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name),
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),

AltDA: altda.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
PipelineInitialResetTimeout: pipelineInitialResetTimeout,
}

if err := cfg.LoadPersisted(log); err != nil {
Expand Down
Loading