Skip to content

Commit

Permalink
op-node/rollup/derive: Add Holocene Channel Stage (ethereum-optimism#…
Browse files Browse the repository at this point in the history
…12334)

This only adds the new stage, but doesn't wire it into the derivation
pipeline yet.
  • Loading branch information
sebastianst authored Oct 15, 2024
1 parent f0d7738 commit 6ae28f5
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 22 deletions.
2 changes: 1 addition & 1 deletion op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func writeChannel(ch ChannelWithMetadata, filename string) error {
// from the channel. Returns a ChannelWithMetadata struct containing all the relevant data.
func ProcessFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
spec := rollup.NewChainSpec(rollupCfg)
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock}, rollupCfg.IsHolocene(frames[0].Timestamp))
invalidFrame := false

for _, frame := range frames {
Expand Down
35 changes: 24 additions & 11 deletions op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ const (
)

// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order.
// Frames are allowed to be ingested out of order, unless the channel is set to follow Holocene
// rules.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
// channel may mark itself as ready for reading once all intervening frames have been added
type Channel struct {
// id of the channel
id ChannelID
openBlock eth.L1BlockRef
id ChannelID
openBlock eth.L1BlockRef
requireInOrder bool

// estimated memory size, used to drop the channel if we have too much data
size uint64
Expand All @@ -45,11 +47,14 @@ type Channel struct {
highestL1InclusionBlock eth.L1BlockRef
}

func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel {
// NewChannel creates a new channel with the given id and openening block. If requireInOrder is
// true, frames must be added in order.
func NewChannel(id ChannelID, openBlock eth.L1BlockRef, requireInOrder bool) *Channel {
return &Channel{
id: id,
inputs: make(map[uint64]Frame),
openBlock: openBlock,
id: id,
inputs: make(map[uint64]Frame),
openBlock: openBlock,
requireInOrder: requireInOrder,
}
}

Expand All @@ -70,18 +75,22 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error
if ch.closed && frame.FrameNumber >= ch.endFrameNumber {
return fmt.Errorf("frame number (%d) is greater than or equal to end frame number (%d) of a closed channel", frame.FrameNumber, ch.endFrameNumber)
}
if ch.requireInOrder && int(frame.FrameNumber) != len(ch.inputs) {
return fmt.Errorf("frame out of order, expected %d, got %d", len(ch.inputs), frame.FrameNumber)
}

// Guaranteed to succeed. Now update internal state
if frame.IsLast {
ch.endFrameNumber = frame.FrameNumber
ch.closed = true
}
// Prune frames with a number higher than the closing frame number when we receive a closing frame
// Prune frames with a number higher than the closing frame number when we receive a closing frame.
// Note that the following condition is guaranteed to never be true with strict Holocene ordering.
if frame.IsLast && ch.endFrameNumber < ch.highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range ch.inputs {
if id >= uint64(ch.endFrameNumber) {
delete(ch.inputs, id)
for idx, prunedFrame := range ch.inputs {
if idx >= uint64(ch.endFrameNumber) {
delete(ch.inputs, idx)
}
ch.size -= frameSize(prunedFrame)
}
Expand Down Expand Up @@ -119,6 +128,10 @@ func (ch *Channel) Size() uint64 {
return ch.size
}

func (ch *Channel) ID() ChannelID {
return ch.id
}

// IsReady returns true iff the channel is ready to be read.
func (ch *Channel) IsReady() bool {
// Must see the last frame before the channel is ready to be read
Expand Down
133 changes: 133 additions & 0 deletions op-node/rollup/derive/channel_assembler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package derive

import (
"context"
"errors"
"io"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)

// ChannelAssembler assembles frames into a raw channel. It replaces the ChannelBank since Holocene.
type ChannelAssembler struct {
log log.Logger
spec ChannelStageSpec
metrics Metrics

channel *Channel

prev NextFrameProvider
}

var _ ResettableStage = (*ChannelAssembler)(nil)

type ChannelStageSpec interface {
ChannelTimeout(t uint64) uint64
MaxRLPBytesPerChannel(t uint64) uint64
}

// NewChannelStage creates a Holocene ChannelStage.
// It must only be used for derivation from Holocene activation.
func NewChannelStage(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
return &ChannelAssembler{
log: log,
spec: spec,
metrics: m,
prev: prev,
}
}

func (ca *ChannelAssembler) Log() log.Logger {
return ca.log.New("stage", "channel", "origin", ca.Origin())
}

func (ca *ChannelAssembler) Origin() eth.L1BlockRef {
return ca.prev.Origin()
}

func (ca *ChannelAssembler) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
ca.resetChannel()
return io.EOF
}

func (ca *ChannelAssembler) resetChannel() {
ca.channel = nil
}

// Returns whether the current staging channel is timed out. Panics if there's no current channel.
func (ca *ChannelAssembler) channelTimedOut() bool {
return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number
}

func (ca *ChannelAssembler) NextData(ctx context.Context) ([]byte, error) {
if ca.channel != nil && ca.channelTimedOut() {
ca.metrics.RecordChannelTimedOut()
ca.resetChannel()
}

lgr := ca.Log()
origin := ca.Origin()

// Note that if the current channel was already completed, we would have forwarded its data
// already. So we start by reading in frames.
if ca.channel != nil && ca.channel.IsReady() {
return nil, NewCriticalError(errors.New("unexpected ready channel"))
}

// Ingest frames until we either hit an error (including io.EOF and NotEnoughData) or complete a
// channel.
// Note that we ingest the frame queue in a loop instead of returning NotEnoughData after a
// single frame ingestion, because it is guaranteed that the total size of new frames ingested
// per L1 origin block is limited by the size of batcher transactions in that block and it
// doesn't make a difference in computational effort if these are many small frames or one large
// frame of that size. Plus, this is really just moving data around, no decompression etc. yet.
for {
frame, err := ca.prev.NextFrame(ctx)
if err != nil { // includes io.EOF; a last frame broke the loop already
return nil, err
}

// first frames always start a new channel, discarding an existing one
if frame.FrameNumber == 0 {
ca.metrics.RecordHeadChannelOpened()
ca.channel = NewChannel(frame.ID, origin, true)
}
if frame.FrameNumber > 0 && ca.channel == nil {
lgr.Warn("dropping non-first frame without channel",
"frame_channel", frame.ID, "frame_number", frame.FrameNumber)
continue // read more frames
}

// Catches Holocene ordering rules. Note that even though the frame queue is guaranteed to
// only hold ordered frames in the current queue, it cannot guarantee this w.r.t. frames
// that already got dequeued. So ordering has to be checked here again.
if err := ca.channel.AddFrame(frame, origin); err != nil {
lgr.Warn("failed to add frame to channel",
"channel", ca.channel.ID(), "frame_channel", frame.ID,
"frame_number", frame.FrameNumber, "err", err)
continue // read more frames
}
if ca.channel.Size() > ca.spec.MaxRLPBytesPerChannel(ca.Origin().Time) {
lgr.Warn("dropping oversized channel",
"channel", ca.channel.ID(), "frame_number", frame.FrameNumber)
ca.resetChannel()
continue // read more frames
}
ca.metrics.RecordFrame()

if frame.IsLast {
break // forward current complete channel
}
}

ch := ca.channel
// Note that if we exit the frame ingestion loop, we're guaranteed to have a ready channel.
if ch == nil || !ch.IsReady() {
return nil, NewCriticalError(errors.New("unexpected non-ready channel"))
}

ca.resetChannel()
r := ch.Reader()
return io.ReadAll(r)
}
169 changes: 169 additions & 0 deletions op-node/rollup/derive/channel_assembler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package derive

import (
"context"
"io"
"log/slog"
"testing"

"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
rolluptest "github.com/ethereum-optimism/optimism/op-node/rollup/test"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)

func TestChannelStage_NextData(t *testing.T) {
for _, tc := range []struct {
desc string
frames [][]testFrame
expErr []error
expData []string
expChID []string
rlpOverride *uint64
}{
{
desc: "simple",
frames: [][]testFrame{
{"a:0:first!"},
},
expErr: []error{nil},
expData: []string{"first"},
expChID: []string{""},
},
{
desc: "simple-two",
frames: [][]testFrame{
{"a:0:first", "a:1:second!"},
},
expErr: []error{nil},
expData: []string{"firstsecond"},
expChID: []string{""},
},
{
desc: "drop-other",
frames: [][]testFrame{
{"a:0:first", "b:1:foo"},
{"a:1:second", "c:1:bar!"},
{"a:2:third!"},
},
expErr: []error{io.EOF, io.EOF, nil},
expData: []string{"", "", "firstsecondthird"},
expChID: []string{"a", "a", ""},
},
{
desc: "drop-non-first",
frames: [][]testFrame{
{"a:1:foo"},
},
expErr: []error{io.EOF},
expData: []string{""},
expChID: []string{""},
},
{
desc: "first-discards",
frames: [][]testFrame{
{"b:0:foo"},
{"a:0:first!"},
},
expErr: []error{io.EOF, nil},
expData: []string{"", "first"},
expChID: []string{"b", ""},
},
{
desc: "already-closed",
frames: [][]testFrame{
{"a:0:foo"},
{"a:1:bar!", "a:2:baz!"},
},
expErr: []error{io.EOF, nil},
expData: []string{"", "foobar"},
expChID: []string{"a", ""},
},
{
desc: "max-size",
frames: [][]testFrame{
{"a:0:0123456789!"},
},
expErr: []error{nil},
expData: []string{"0123456789"},
expChID: []string{""},
rlpOverride: ptr[uint64](frameOverhead + 10),
},
{
desc: "oversized",
frames: [][]testFrame{
{"a:0:0123456789x!"},
},
expErr: []error{io.EOF},
expData: []string{""},
expChID: []string{""},
rlpOverride: ptr[uint64](frameOverhead + 10),
},
} {
t.Run(tc.desc, func(t *testing.T) {
fq := &fakeChannelBankInput{}
lgr := testlog.Logger(t, slog.LevelWarn)
spec := &rolluptest.ChainSpec{
ChainSpec: rollup.NewChainSpec(&rollup.Config{}),

MaxRLPBytesPerChannelOverride: tc.rlpOverride,
}
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)

for i, fs := range tc.frames {
fq.AddFrames(fs...)
data, err := cs.NextData(context.Background())
require.Equal(t, tc.expData[i], string(data))
require.ErrorIs(t, tc.expErr[i], err)
// invariant: never holds a ready channel
require.True(t, cs.channel == nil || !cs.channel.IsReady())

cid := tc.expChID[i]
if cid == "" {
require.Nil(t, cs.channel)
} else {
require.Equal(t, strChannelID(cid), cs.channel.ID())
}
}

// final call should always be io.EOF after exhausting frame queue
data, err := cs.NextData(context.Background())
require.Nil(t, data)
require.Equal(t, io.EOF, err)
})
}
}

func TestChannelStage_NextData_Timeout(t *testing.T) {
require := require.New(t)
fq := &fakeChannelBankInput{}
lgr := testlog.Logger(t, slog.LevelWarn)
spec := rollup.NewChainSpec(&rollup.Config{GraniteTime: ptr(uint64(0))}) // const channel timeout
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)

fq.AddFrames("a:0:foo")
data, err := cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
require.Equal(strChannelID("a"), cs.channel.ID())

// move close to timeout
fq.origin.Number = spec.ChannelTimeout(0)
fq.AddFrames("a:1:bar")
data, err = cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
require.Equal(strChannelID("a"), cs.channel.ID())

// timeout channel by moving origin past timeout
fq.origin.Number = spec.ChannelTimeout(0) + 1
fq.AddFrames("a:2:baz!")
data, err = cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.Nil(cs.channel)
}
2 changes: 1 addition & 1 deletion op-node/rollup/derive/channel_bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
cb.metrics.RecordHeadChannelOpened()
}
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, origin)
currentCh = NewChannel(f.ID, origin, false)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
log.Info("created new channel")
Expand Down
Loading

0 comments on commit 6ae28f5

Please sign in to comment.