Skip to content

Commit

Permalink
Merge branch 'master' into trace-state-and-auto-redeems
Browse files Browse the repository at this point in the history
  • Loading branch information
rachel-bousfield authored Apr 11, 2022
2 parents 54d93b3 + fba6bdb commit 832efa3
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 29 deletions.
2 changes: 1 addition & 1 deletion arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (d *DelayedSequencer) update(ctx context.Context, lastBlockHeader *types.He
}

func (d *DelayedSequencer) run(ctx context.Context) {
headerChan, cancel := d.l1Reader.Subscribe()
headerChan, cancel := d.l1Reader.Subscribe(false)
defer cancel()

for {
Expand Down
81 changes: 55 additions & 26 deletions arbnode/l1reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (

type L1Reader struct {
util.StopWaiter
config L1ReaderConfig
client arbutil.L1Interface
outChannels map[chan<- *types.Header]struct{}
outChannelsBehind map[chan<- *types.Header]struct{}
chanMutex sync.Mutex
lastBroadcastHash common.Hash
lastBroadcastHeader *types.Header
lastPendingBlockNr uint64
config L1ReaderConfig
client arbutil.L1Interface

chanMutex sync.Mutex
// All fields below require the chanMutex
outChannels map[chan<- *types.Header]struct{}
outChannelsBehind map[chan<- *types.Header]struct{}
lastBroadcastHash common.Hash
lastBroadcastHeader *types.Header
lastPendingCallBlockNr uint64
requiresPendingCallUpdates int
}

type L1ReaderConfig struct {
Expand Down Expand Up @@ -67,20 +70,30 @@ func NewL1Reader(client arbutil.L1Interface, config L1ReaderConfig) *L1Reader {
}

// Subscribers are notified when there is a change.
func (s *L1Reader) Subscribe() (<-chan *types.Header, func()) {
// Channel could be missing headers and have duplicates.
// Listening to the channel will make sure listenere is notified when header changes.
func (s *L1Reader) Subscribe(requireBlockNrUpdates bool) (<-chan *types.Header, func()) {
s.chanMutex.Lock()
defer s.chanMutex.Unlock()

if requireBlockNrUpdates {
s.requiresPendingCallUpdates++
}
result := make(chan *types.Header)
outchannel := (chan<- *types.Header)(result)
s.outChannelsBehind[outchannel] = struct{}{}
unsubscribeFunc := func() { s.unsubscribe(outchannel) }
unsubscribeFunc := func() { s.unsubscribe(requireBlockNrUpdates, outchannel) }
return result, unsubscribeFunc
}

func (s *L1Reader) unsubscribe(from chan<- *types.Header) {
func (s *L1Reader) unsubscribe(requireBlockNrUpdates bool, from chan<- *types.Header) {
s.chanMutex.Lock()
defer s.chanMutex.Unlock()

if requireBlockNrUpdates {
s.requiresPendingCallUpdates--
}

if _, ok := s.outChannels[from]; ok {
delete(s.outChannels, from)
close(from)
Expand All @@ -95,6 +108,8 @@ func (s *L1Reader) closeAll() {
s.chanMutex.Lock()
defer s.chanMutex.Unlock()

s.requiresPendingCallUpdates = 0

for ch := range s.outChannels {
delete(s.outChannels, ch)
close(ch)
Expand All @@ -118,15 +133,17 @@ func (s *L1Reader) possiblyBroadcast(h *types.Header) {
s.lastBroadcastHeader = h
}

pendingBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client)
if err == nil && pendingBlockNr.IsUint64() {
pendingU64 := pendingBlockNr.Uint64()
if pendingU64 > s.lastPendingBlockNr {
broadcastThis = true
s.lastPendingBlockNr = pendingU64
if s.requiresPendingCallUpdates > 0 {
pendingCallBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client)
if err == nil && pendingCallBlockNr.IsUint64() {
pendingU64 := pendingCallBlockNr.Uint64()
if pendingU64 > s.lastPendingCallBlockNr {
broadcastThis = true
s.lastPendingCallBlockNr = pendingU64
}
} else {
log.Warn("GetPendingCallBlockNr: bad result", "err", err, "number", pendingCallBlockNr)
}
} else {
log.Warn("GetPendingBlockNr: bad result", "err", err, "number", pendingBlockNr)
}

if broadcastThis {
Expand Down Expand Up @@ -203,19 +220,17 @@ func (s *L1Reader) broadcastLoop(ctx context.Context) {
}

func (s *L1Reader) WaitForTxApproval(ctxIn context.Context, tx *types.Transaction) (*types.Receipt, error) {
headerchan, unsubscribe := s.Subscribe()
headerchan, unsubscribe := s.Subscribe(true)
defer unsubscribe()
ctx, cancel := context.WithTimeout(ctxIn, s.config.TxTimeout)
defer cancel()
txHash := tx.Hash()
for {
receipt, err := s.client.TransactionReceipt(ctx, txHash)
if err == nil {
callBlockNr, err := arbutil.GetPendingCallBlockNumber(ctx, s.client)
if err != nil {
return nil, err
}
if callBlockNr.Cmp(receipt.BlockNumber) > 0 {
if err == nil && receipt.BlockNumber.IsUint64() {
receiptBlockNr := receipt.BlockNumber.Uint64()
callBlockNr := s.LastPendingCallBlockNr()
if callBlockNr > receiptBlockNr {
return receipt, arbutil.DetailTxError(ctx, s.client, tx, receipt)
}
}
Expand All @@ -240,6 +255,20 @@ func (s *L1Reader) LastHeader(ctx context.Context) (*types.Header, error) {
return s.client.HeaderByNumber(ctx, nil)
}

func (s *L1Reader) UpdatingPendingCallBlockNr() bool {
s.chanMutex.Lock()
defer s.chanMutex.Unlock()
return s.requiresPendingCallUpdates > 0
}

// blocknumber used by pending calls.
// only updated if UpdatingPendingCallBlockNr returns true
func (s *L1Reader) LastPendingCallBlockNr() uint64 {
s.chanMutex.Lock()
defer s.chanMutex.Unlock()
return s.lastPendingCallBlockNr
}

func (s *L1Reader) Client() arbutil.L1Interface {
return s.client
}
Expand Down
2 changes: 1 addition & 1 deletion arbnode/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {
return errors.New("sequencer not initialized")
}

headerChan, cancel := s.l1Reader.Subscribe()
headerChan, cancel := s.l1Reader.Subscribe(false)

s.LaunchThread(func(ctx context.Context) {
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion validator/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type InboxReaderInterface interface {

type L1ReaderInterface interface {
Client() arbutil.L1Interface
Subscribe() (<-chan *types.Header, func())
Subscribe(bool) (<-chan *types.Header, func())
WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
}

Expand Down

0 comments on commit 832efa3

Please sign in to comment.