diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 54334c150296..69cc5f4e428a 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -203,6 +203,13 @@ var ( }(), Category: RollupCategory, } + PipelineInitialResetTimeout = &cli.Uint64Flag{ + Name: "l1.pipeline-initial-reset-timeout", + Usage: "Number of L1 blocks, control pipeline initial reset.", + EnvVars: prefixEnvVars("PIPELINE_INITIAL_RESET_TIMEOUT"), + Value: 10, + Category: L1RPCCategory, + } VerifierL1Confs = &cli.Uint64Flag{ Name: "verifier.l1-confs", Usage: "Number of L1 blocks to keep distance from the L1 head before deriving L2 data from. Reorgs are supported, but may be slow to perform.", @@ -419,6 +426,7 @@ var optionalFlags = []cli.Flag{ ConductorRpcTimeoutFlag, SafeDBPath, L2EngineKind, + PipelineInitialResetTimeout, } var DeprecatedFlags = []cli.Flag{ diff --git a/op-node/node/config.go b/op-node/node/config.go index a78b55853aa2..f894d5aea823 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -74,6 +74,8 @@ type Config struct { // AltDA config AltDA altda.CLIConfig + + PipelineInitialResetTimeout uint64 } type RPCConfig struct { diff --git a/op-node/node/node.go b/op-node/node/node.go index 6d7cb9968fdd..935689eb92ad 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -425,7 +425,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error { n.safeDB = safedb.Disabled } n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, - n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA) + n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA, cfg.PipelineInitialResetTimeout) return nil } diff --git a/op-node/rollup/derive/deriver.go b/op-node/rollup/derive/deriver.go index 760891648524..9721d595c5f8 100644 --- a/op-node/rollup/derive/deriver.go +++ b/op-node/rollup/derive/deriver.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "time" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/event" @@ -116,6 +117,8 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool { } else if err != nil && errors.Is(err, NotEnoughData) { // don't do a backoff for this error d.emitter.Emit(DeriverMoreEvent{}) + } else if err != nil && errors.Is(err, InitialResetting) { + d.pipeline.pipelineInitialResetNextTime = time.Now().Add(d.pipeline.pipelineInitialResetDuration) } else if err != nil { d.pipeline.log.Error("Derivation process error", "err", err) d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err}) @@ -132,6 +135,10 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool { case ConfirmReceivedAttributesEvent: d.needAttributesConfirmation = false default: + if d.pipeline.pipeLineIsInitialReset && + d.pipeline.pipelineInitialResetNextTime.Before(time.Now()) { + d.emitter.Emit(PipelineStepEvent{PendingSafe: d.pipeline.pipelineInitialResetNextPendingSafe}) + } return false } return true diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index a09255144056..159973517789 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -15,6 +15,8 @@ var ( // EngineELSyncing implies that the execution engine is currently in progress of syncing. EngineELSyncing = errors.New("engine is performing EL sync") + InitialResetting = errors.New("pipeline is performing reset") + // Sentinel errors, use these to get the severity of errors by calling // errors.Is(err, ErrTemporary) for example. ErrTemporary = NewTemporaryError(nil) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 4124382a9cc0..932ed5ded580 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -4,11 +4,11 @@ import ( "context" "errors" "fmt" - "io" - "sync/atomic" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "io" + "sync/atomic" + "time" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -71,7 +71,15 @@ type DerivationPipeline struct { origin eth.L1BlockRef resetL2Safe eth.L2BlockRef resetSysConfig eth.SystemConfig - // Its value is only 1 or 0 + + // InitialReset is resetting + pipeLineIsInitialReset bool + pipelineInitialResetTimeout uint64 + pipelineInitialResetL2 eth.L2BlockRef + pipelineInitialResetNextTime time.Time + pipelineInitialResetDuration time.Duration + pipelineInitialResetNextPendingSafe eth.L2BlockRef + engineIsReset atomic.Bool metrics Metrics @@ -79,7 +87,7 @@ type DerivationPipeline struct { // NewDerivationPipeline creates a DerivationPipeline, to turn L1 data into L2 block-inputs. func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, - altDA AltDAInputFetcher, l2Source L2Source, metrics Metrics) *DerivationPipeline { + altDA AltDAInputFetcher, l2Source L2Source, metrics Metrics, pipelineInitialResetTimeout uint64) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) @@ -98,15 +106,17 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L stages := []ResettableStage{l1Traversal, l1Src, altDA, frameQueue, bank, chInReader, batchQueue, attributesQueue} return &DerivationPipeline{ - log: log, - rollupCfg: rollupCfg, - l1Fetcher: l1Fetcher, - altDA: altDA, - stages: stages, - metrics: metrics, - traversal: l1Traversal, - attrib: attributesQueue, - l2: l2Source, + log: log, + rollupCfg: rollupCfg, + l1Fetcher: l1Fetcher, + altDA: altDA, + stages: stages, + metrics: metrics, + traversal: l1Traversal, + attrib: attributesQueue, + l2: l2Source, + pipelineInitialResetTimeout: pipelineInitialResetTimeout, + pipelineInitialResetDuration: 5 * time.Second, } } @@ -156,8 +166,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl // so we can read all the L2 data necessary for constructing the next batches that come after the safe head. if pendingSafeHead != dp.resetL2Safe { if err := dp.initialReset(ctx, pendingSafeHead); err != nil { + if errors.Is(err, InitialResetting) { + dp.pipelineInitialResetNextPendingSafe = pendingSafeHead + return nil, err + } return nil, fmt.Errorf("failed initial reset work: %w", err) } + } else { + dp.pipeLineIsInitialReset = false } resetting := dp.resetting.Load() @@ -204,17 +220,23 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth. // Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from. pipelineL2 := resetL2Safe l1Origin := resetL2Safe.L1Origin + if dp.pipeLineIsInitialReset { + pipelineL2 = dp.pipelineInitialResetL2 + l1Origin = dp.pipelineInitialResetL2.L1Origin + } pipelineOrigin, err := dp.l1Fetcher.L1BlockRefByHash(ctx, l1Origin.Hash) if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err)) } + var afterChannelTimeout bool for { afterL2Genesis := pipelineL2.Number > dp.rollupCfg.Genesis.L2.Number afterL1Genesis := pipelineL2.L1Origin.Number > dp.rollupCfg.Genesis.L1.Number - afterChannelTimeout := pipelineL2.L1Origin.Number+spec.ChannelTimeout(pipelineOrigin.Time) > l1Origin.Number - if afterL2Genesis && afterL1Genesis && afterChannelTimeout { + afterPipelineInitialResetTimeout := pipelineL2.L1Origin.Number+dp.pipelineInitialResetTimeout > l1Origin.Number + afterChannelTimeout = pipelineL2.L1Origin.Number+spec.ChannelTimeout(pipelineOrigin.Time) > l1Origin.Number + if afterL2Genesis && afterL1Genesis && afterPipelineInitialResetTimeout && afterChannelTimeout { parent, err := dp.l2.L2BlockRefByHash(ctx, pipelineL2.ParentHash) if err != nil { return NewResetError(fmt.Errorf("failed to fetch L2 parent block %s", pipelineL2.ParentID())) @@ -225,6 +247,12 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth. return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err)) } } else { + if afterChannelTimeout { + dp.pipelineInitialResetL2 = pipelineL2 + dp.pipeLineIsInitialReset = true + } else { + dp.pipeLineIsInitialReset = false + } break } } @@ -236,8 +264,13 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth. dp.origin = pipelineOrigin dp.resetSysConfig = sysCfg - dp.resetL2Safe = resetL2Safe - return nil + dp.resetL2Safe = pipelineL2 + + if afterChannelTimeout { + return InitialResetting + } else { + return nil + } } func (dp *DerivationPipeline) ConfirmEngineReset() { diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 81607e612d5a..961211ba3916 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -173,6 +173,7 @@ func NewDriver( syncCfg *sync.Config, sequencerConductor conductor.SequencerConductor, altDA AltDAIface, + pipelineInitialResetTimeout uint64, ) *Driver { driverCtx, driverCancel := context.WithCancel(context.Background()) @@ -215,7 +216,7 @@ func NewDriver( sys.Register("attributes-handler", attributes.NewAttributesHandler(log, cfg, driverCtx, l2), opts) - derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, altDA, l2, metrics) + derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, altDA, l2, metrics, pipelineInitialResetTimeout) sys.Register("pipeline", derive.NewPipelineDeriver(driverCtx, derivationPipeline), opts) diff --git a/op-node/service.go b/op-node/service.go index b24e2a638335..d2666424217c 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -75,6 +75,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { haltOption = "" } + pipelineInitialResetTimeout := ctx.Uint64(flags.PipelineInitialResetTimeout.Name) + if ctx.IsSet(flags.HeartbeatEnabledFlag.Name) || ctx.IsSet(flags.HeartbeatMonikerFlag.Name) || ctx.IsSet(flags.HeartbeatURLFlag.Name) { @@ -112,7 +114,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name), ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name), - AltDA: altda.ReadCLIConfig(ctx), + AltDA: altda.ReadCLIConfig(ctx), + PipelineInitialResetTimeout: pipelineInitialResetTimeout, } if err := cfg.LoadPersisted(log); err != nil {