Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete messages from coordinator after they become final #2471

Merged
merged 17 commits into from
Aug 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
@@ -484,7 +484,12 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
}
// Was, and still is, the active sequencer
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could fail if parent chain doesn't support finalized..
For now, I think a good solution would be to have a boolean (default: true) option that enables deleting finalized messages

if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized != 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason you are checking for zero?
A little after initialization it is a valid return value and people will ask us if they get warnings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking for 0 because sequencer coordinator could be enabled without inboxreader/l1reader and l1reader is required to fetch finalizedMsgCount. In cases when inboxreader/l1reader is nil, sync monitor is made to return 0, nil this meant a reasonable return value to check if we should skip deleting of finalized messages

} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
@@ -506,11 +511,7 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error {
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil || finalized == 0 {
return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err)
}
func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
updateFinalizedMsgCount := func() error {
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
@@ -522,35 +523,31 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
if errors.Is(err, redis.Nil) {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
var keys []string
for msg := finalized; ; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if exists == 0 || err != nil {
diegoximenes marked this conversation as resolved.
Show resolved Hide resolved
break
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
if len(keys) > 0 {
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
if err := c.Client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err)
}
return updateFinalizedMsgCount()
}
return updateFinalizedMsgCount()
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := finalized
if msgToDelete > remoteMsgCount {
msgToDelete = remoteMsgCount
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized + 1; msg <= msgToDelete; msg++ {
@@ -603,10 +600,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
3 changes: 1 addition & 2 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"sync"
"time"

@@ -77,7 +76,7 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url")
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
4 changes: 4 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
@@ -232,6 +232,10 @@ func mainImpl() int {
if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer {
log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer)
}
if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable {
log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url")
return 1
}

var dataSigner signature.DataSignerFunc
var l1TransactionOptsValidator *bind.TransactOpts