Skip to content

Commit

Permalink
Showing 3 changed files with 29 additions and 32 deletions.
54 changes: 24 additions & 30 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
@@ -484,7 +484,12 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized != 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
@@ -506,11 +511,7 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil || finalized == 0 {
return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err)
}
func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
updateFinalizedMsgCount := func() error {
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
@@ -522,35 +523,31 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
break
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
return updateFinalizedMsgCount()
}
return updateFinalizedMsgCount()
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := finalized
if msgToDelete > remoteMsgCount {
msgToDelete = remoteMsgCount
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized + 1; msg <= msgToDelete; msg++ {
@@ -603,10 +600,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
3 changes: 1 addition & 2 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"sync"
"time"

@@ -77,7 +76,7 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url")
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
4 changes: 4 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
@@ -232,6 +232,10 @@ func mainImpl() int {
if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer {
log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer)
}
if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable {
log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url")
return 1
}

var dataSigner signature.DataSignerFunc
var l1TransactionOptsValidator *bind.TransactOpts

0 comments on commit ab8e6a8

Please sign in to comment.