diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 344a3742df..797f07c659 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -154,7 +154,7 @@ func (d *DelayedSequencer) update(ctx context.Context, lastBlockHeader *types.He } func (d *DelayedSequencer) run(ctx context.Context) { - headerChan, cancel := d.l1Reader.Subscribe() + headerChan, cancel := d.l1Reader.Subscribe(false) defer cancel() for { diff --git a/arbnode/l1reader.go b/arbnode/l1reader.go index 7d69af03bd..7b0b752804 100644 --- a/arbnode/l1reader.go +++ b/arbnode/l1reader.go @@ -17,14 +17,17 @@ import ( type L1Reader struct { util.StopWaiter - config L1ReaderConfig - client arbutil.L1Interface - outChannels map[chan<- *types.Header]struct{} - outChannelsBehind map[chan<- *types.Header]struct{} - chanMutex sync.Mutex - lastBroadcastHash common.Hash - lastBroadcastHeader *types.Header - lastPendingBlockNr uint64 + config L1ReaderConfig + client arbutil.L1Interface + + chanMutex sync.Mutex + // All fields below require the chanMutex + outChannels map[chan<- *types.Header]struct{} + outChannelsBehind map[chan<- *types.Header]struct{} + lastBroadcastHash common.Hash + lastBroadcastHeader *types.Header + lastPendingCallBlockNr uint64 + requiresPendingCallUpdates int } type L1ReaderConfig struct { @@ -67,20 +70,30 @@ func NewL1Reader(client arbutil.L1Interface, config L1ReaderConfig) *L1Reader { } // Subscribers are notified when there is a change. -func (s *L1Reader) Subscribe() (<-chan *types.Header, func()) { +// Channel could be missing headers and have duplicates. +// Listening to the channel will make sure listenere is notified when header changes. +func (s *L1Reader) Subscribe(requireBlockNrUpdates bool) (<-chan *types.Header, func()) { s.chanMutex.Lock() defer s.chanMutex.Unlock() + if requireBlockNrUpdates { + s.requiresPendingCallUpdates++ + } result := make(chan *types.Header) outchannel := (chan<- *types.Header)(result) s.outChannelsBehind[outchannel] = struct{}{} - unsubscribeFunc := func() { s.unsubscribe(outchannel) } + unsubscribeFunc := func() { s.unsubscribe(requireBlockNrUpdates, outchannel) } return result, unsubscribeFunc } -func (s *L1Reader) unsubscribe(from chan<- *types.Header) { +func (s *L1Reader) unsubscribe(requireBlockNrUpdates bool, from chan<- *types.Header) { s.chanMutex.Lock() defer s.chanMutex.Unlock() + + if requireBlockNrUpdates { + s.requiresPendingCallUpdates-- + } + if _, ok := s.outChannels[from]; ok { delete(s.outChannels, from) close(from) @@ -95,6 +108,8 @@ func (s *L1Reader) closeAll() { s.chanMutex.Lock() defer s.chanMutex.Unlock() + s.requiresPendingCallUpdates = 0 + for ch := range s.outChannels { delete(s.outChannels, ch) close(ch) @@ -118,15 +133,17 @@ func (s *L1Reader) possiblyBroadcast(h *types.Header) { s.lastBroadcastHeader = h } - pendingBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client) - if err == nil && pendingBlockNr.IsUint64() { - pendingU64 := pendingBlockNr.Uint64() - if pendingU64 > s.lastPendingBlockNr { - broadcastThis = true - s.lastPendingBlockNr = pendingU64 + if s.requiresPendingCallUpdates > 0 { + pendingCallBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client) + if err == nil && pendingCallBlockNr.IsUint64() { + pendingU64 := pendingCallBlockNr.Uint64() + if pendingU64 > s.lastPendingCallBlockNr { + broadcastThis = true + s.lastPendingCallBlockNr = pendingU64 + } + } else { + log.Warn("GetPendingCallBlockNr: bad result", "err", err, "number", pendingCallBlockNr) } - } else { - log.Warn("GetPendingBlockNr: bad result", "err", err, "number", pendingBlockNr) } if broadcastThis { @@ -203,19 +220,17 @@ func (s *L1Reader) broadcastLoop(ctx context.Context) { } func (s *L1Reader) WaitForTxApproval(ctxIn context.Context, tx *types.Transaction) (*types.Receipt, error) { - headerchan, unsubscribe := s.Subscribe() + headerchan, unsubscribe := s.Subscribe(true) defer unsubscribe() ctx, cancel := context.WithTimeout(ctxIn, s.config.TxTimeout) defer cancel() txHash := tx.Hash() for { receipt, err := s.client.TransactionReceipt(ctx, txHash) - if err == nil { - callBlockNr, err := arbutil.GetPendingCallBlockNumber(ctx, s.client) - if err != nil { - return nil, err - } - if callBlockNr.Cmp(receipt.BlockNumber) > 0 { + if err == nil && receipt.BlockNumber.IsUint64() { + receiptBlockNr := receipt.BlockNumber.Uint64() + callBlockNr := s.LastPendingCallBlockNr() + if callBlockNr > receiptBlockNr { return receipt, arbutil.DetailTxError(ctx, s.client, tx, receipt) } } @@ -240,6 +255,20 @@ func (s *L1Reader) LastHeader(ctx context.Context) (*types.Header, error) { return s.client.HeaderByNumber(ctx, nil) } +func (s *L1Reader) UpdatingPendingCallBlockNr() bool { + s.chanMutex.Lock() + defer s.chanMutex.Unlock() + return s.requiresPendingCallUpdates > 0 +} + +// blocknumber used by pending calls. +// only updated if UpdatingPendingCallBlockNr returns true +func (s *L1Reader) LastPendingCallBlockNr() uint64 { + s.chanMutex.Lock() + defer s.chanMutex.Unlock() + return s.lastPendingCallBlockNr +} + func (s *L1Reader) Client() arbutil.L1Interface { return s.client } diff --git a/arbnode/sequencer.go b/arbnode/sequencer.go index df645cc7c3..938a464552 100644 --- a/arbnode/sequencer.go +++ b/arbnode/sequencer.go @@ -303,7 +303,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error { return errors.New("sequencer not initialized") } - headerChan, cancel := s.l1Reader.Subscribe() + headerChan, cancel := s.l1Reader.Subscribe(false) s.LaunchThread(func(ctx context.Context) { defer cancel() diff --git a/validator/stateless_block_validator.go b/validator/stateless_block_validator.go index 47b4052dd2..8b9615b45c 100644 --- a/validator/stateless_block_validator.go +++ b/validator/stateless_block_validator.go @@ -58,7 +58,7 @@ type InboxReaderInterface interface { type L1ReaderInterface interface { Client() arbutil.L1Interface - Subscribe() (<-chan *types.Header, func()) + Subscribe(bool) (<-chan *types.Header, func()) WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) }