From 9745003e289cfca1e22600f4489155e598b0c349 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 10 Jul 2024 10:51:52 -0500 Subject: [PATCH 1/8] Delete messages from coordinator after they become final --- arbnode/seq_coordinator.go | 80 ++++++++++++++++++++++++++++- arbnode/sync_monitor.go | 8 +++ util/redisutil/redis_coordinator.go | 13 ++--- 3 files changed, 93 insertions(+), 8 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cdf1011b11..64b1ef9b81 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -39,6 +39,7 @@ type SeqCoordinator struct { redisutil.RedisCoordinator + sync *SyncMonitor streamer *TransactionStreamer sequencer execution.ExecutionSequencer delayedSequencer *DelayedSequencer @@ -104,7 +105,7 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RedisUrl: "", LockoutDuration: time.Minute, LockoutSpare: 30 * time.Second, - SeqNumDuration: 24 * time.Hour, + SeqNumDuration: 10 * 24 * time.Hour, UpdateInterval: 250 * time.Millisecond, HandoffTimeout: 30 * time.Second, SafeShutdownDelay: 5 * time.Second, @@ -149,6 +150,7 @@ func NewSeqCoordinator( } coordinator := &SeqCoordinator{ RedisCoordinator: *redisCoordinator, + sync: sync, streamer: streamer, sequencer: sequencer, config: config, @@ -338,6 +340,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC return nil } +func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result() + if err != nil { + return 0, err + } + return c.signedBytesToMsgCount(ctx, []byte(resStr)) +} + func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) { resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result() if errors.Is(err, redis.Nil) { @@ -473,6 +483,10 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } // Was, and still is, the active sequencer + // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key + if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil { + log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) { @@ -492,6 +506,64 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } +func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error { + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil || finalized == 0 { + return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err) + } + updateFinalizedMsgCount := func() error { + finalizedBytes, err := c.msgCountToSignedBytes(finalized) + if err != nil { + return err + } + if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil { + return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err) + } + return nil + } + prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) + if err != nil { + if errors.Is(err, redis.Nil) { + var keys []string + for msg := finalized; ; msg-- { + exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() + if exists == 0 || err != nil { + break + } + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another + if len(keys) > 0 { + log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) + } + } + return updateFinalizedMsgCount() + } + return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) + } + remoteMsgCount, err := c.GetRemoteMsgCount() + if err != nil { + return fmt.Errorf("cannot get remote message count: %w", err) + } + msgToDelete := finalized + if msgToDelete > remoteMsgCount { + msgToDelete = remoteMsgCount + } + if prevFinalized < msgToDelete { + var keys []string + for msg := prevFinalized + 1; msg <= msgToDelete; msg++ { + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err) + } + return updateFinalizedMsgCount() + } + return nil +} + func (c *SeqCoordinator) update(ctx context.Context) time.Duration { chosenSeq, err := c.RecommendSequencerWantingLockout(ctx) if err != nil { @@ -522,6 +594,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Error("cannot read message count", "err", err) return c.config.UpdateInterval } + remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) + } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { log.Warn("cannot get remote message count", "err", err) @@ -534,7 +610,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error - for msgToRead < readUntil { + for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { var resString string resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() if msgReadErr != nil { diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index d3b9a7e1c6..27da6b7331 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -2,6 +2,7 @@ package arbnode import ( "context" + "errors" "sync" "time" @@ -72,6 +73,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex { return s.syncTarget } +func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + if s.inboxReader != nil && s.inboxReader.l1Reader != nil { + return s.inboxReader.GetFinalizedMsgCount(ctx) + } + return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url") +} + func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { msgCount, err := s.txStreamer.GetMessageCount() if err != nil { diff --git a/util/redisutil/redis_coordinator.go b/util/redisutil/redis_coordinator.go index 59e3b0e0f9..2c12ffec50 100644 --- a/util/redisutil/redis_coordinator.go +++ b/util/redisutil/redis_coordinator.go @@ -13,12 +13,13 @@ import ( "github.com/offchainlabs/nitro/arbutil" ) -const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only -const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key -const PRIORITIES_KEY string = "coordinator.priorities" // Read only -const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self -const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN -const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN +const CHOSENSEQ_KEY string = "coordinator.chosen" // Never overwritten. Expires or released only +const MSG_COUNT_KEY string = "coordinator.msgCount" // Only written by sequencer holding CHOSEN key +const FINALIZED_MSG_COUNT_KEY string = "coordinator.finalizedMsgCount" // Only written by sequencer holding CHOSEN key +const PRIORITIES_KEY string = "coordinator.priorities" // Read only +const WANTS_LOCKOUT_KEY_PREFIX string = "coordinator.liveliness." // Per server. Only written by self +const MESSAGE_KEY_PREFIX string = "coordinator.msg." // Per Message. Only written by sequencer holding CHOSEN +const SIGNATURE_KEY_PREFIX string = "coordinator.msg.sig." // Per Message. Only written by sequencer holding CHOSEN const WANTS_LOCKOUT_VAL string = "OK" const INVALID_VAL string = "INVALID" const INVALID_URL string = "" From ab8e6a8d0d4c80ffdca71803fd6e4d8efe2d2629 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 15 Jul 2024 17:47:33 -0500 Subject: [PATCH 2/8] address PR comments --- arbnode/seq_coordinator.go | 54 +++++++++++++++++--------------------- arbnode/sync_monitor.go | 3 +-- cmd/nitro/nitro.go | 4 +++ 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 64b1ef9b81..5fd604842f 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -484,7 +484,12 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin } // Was, and still is, the active sequencer // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key - if err := c.deleteFinalizedMsgsFromRedis(ctx); err != nil { + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) + } else if finalized != 0 { + log.Warn("SyncMonitor returned zero finalizedMessageCount") + } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. @@ -506,11 +511,7 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } -func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error { - finalized, err := c.sync.GetFinalizedMsgCount(ctx) - if err != nil || finalized == 0 { - return fmt.Errorf("finalizedMessageCount is zero or error getting finalizedMessageCount from syncMonitor: %w", err) - } +func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { updateFinalizedMsgCount := func() error { finalizedBytes, err := c.msgCountToSignedBytes(finalized) if err != nil { @@ -522,35 +523,31 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context) error return nil } prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) - if err != nil { - if errors.Is(err, redis.Nil) { - var keys []string - for msg := finalized; ; msg-- { - exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() - if exists == 0 || err != nil { - break - } - keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + if errors.Is(err, redis.Nil) { + var keys []string + for msg := finalized; ; msg-- { + exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() + if exists == 0 || err != nil { + break } - // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another - if len(keys) > 0 { - log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) - if err := c.Client.Del(ctx, keys...).Err(); err != nil { - return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) - } + keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) + } + // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another + if len(keys) > 0 { + log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) } - return updateFinalizedMsgCount() } + return updateFinalizedMsgCount() + } else if err != nil { return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { return fmt.Errorf("cannot get remote message count: %w", err) } - msgToDelete := finalized - if msgToDelete > remoteMsgCount { - msgToDelete = remoteMsgCount - } + msgToDelete := min(finalized, remoteMsgCount) if prevFinalized < msgToDelete { var keys []string for msg := prevFinalized + 1; msg <= msgToDelete; msg++ { @@ -603,10 +600,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("cannot get remote message count", "err", err) return c.retryAfterRedisError() } - readUntil := remoteMsgCount - if readUntil > localMsgCount+c.config.MsgPerPoll { - readUntil = localMsgCount + c.config.MsgPerPoll - } + readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount) var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index 27da6b7331..5ab1ede2d6 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -2,7 +2,6 @@ package arbnode import ( "context" - "errors" "sync" "time" @@ -77,7 +76,7 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message if s.inboxReader != nil && s.inboxReader.l1Reader != nil { return s.inboxReader.GetFinalizedMsgCount(ctx) } - return 0, errors.New("sync monitor's GetFinalizedMsgCount method is unsupported, try starting node with --parent-chain.connection.url") + return 0, nil } func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 04bdeb3228..45cb6a01c7 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -232,6 +232,10 @@ func mainImpl() int { if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer { log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer) } + if nodeConfig.Node.SeqCoordinator.Enable && !nodeConfig.Node.ParentChainReader.Enable { + log.Error("Sequencer coordinator must be enabled with parent chain reader, try starting node with --parent-chain.connection.url") + return 1 + } var dataSigner signature.DataSignerFunc var l1TransactionOptsValidator *bind.TransactOpts From 4cff8b3f4336347845e476460fff2fe8ff86dd79 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 16 Jul 2024 10:56:03 -0500 Subject: [PATCH 3/8] add test for deleteFinalizedMsgsFromRedis --- arbnode/seq_coordinator.go | 8 +- ...atomic_test.go => seq_coordinator_test.go} | 84 +++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) rename arbnode/{seq_coordinator_atomic_test.go => seq_coordinator_test.go} (60%) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 5fd604842f..2f5e724f61 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -525,7 +525,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) if errors.Is(err, redis.Nil) { var keys []string - for msg := finalized; ; msg-- { + for msg := finalized; msg > 0; msg-- { exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() if exists == 0 || err != nil { break @@ -543,11 +543,11 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final } else if err != nil { return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) } - remoteMsgCount, err := c.GetRemoteMsgCount() + remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client) if err != nil { return fmt.Errorf("cannot get remote message count: %w", err) } - msgToDelete := min(finalized, remoteMsgCount) + msgToDelete := min(finalized, remoteMsgCount-1) if prevFinalized < msgToDelete { var keys []string for msg := prevFinalized + 1; msg <= msgToDelete; msg++ { @@ -604,7 +604,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error - for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { + for msgToRead < readUntil && localMsgCount > remoteFinalizedMsgCount { var resString string resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() if msgReadErr != nil { diff --git a/arbnode/seq_coordinator_atomic_test.go b/arbnode/seq_coordinator_test.go similarity index 60% rename from arbnode/seq_coordinator_atomic_test.go rename to arbnode/seq_coordinator_test.go index 61468a3adb..6fa08ce7a1 100644 --- a/arbnode/seq_coordinator_atomic_test.go +++ b/arbnode/seq_coordinator_test.go @@ -156,3 +156,87 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { } } + +func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + coordConfig := TestSeqCoordinatorConfig + coordConfig.LockoutDuration = time.Millisecond * 100 + coordConfig.LockoutSpare = time.Millisecond * 10 + coordConfig.Signer.ECDSA.AcceptSequencer = false + coordConfig.Signer.SymmetricFallback = true + coordConfig.Signer.SymmetricSign = true + coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true + coordConfig.Signer.Symmetric.SigningKey = "" + + nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil) + Require(t, err) + + redisUrl := redisutil.CreateTestRedis(ctx, t) + coordConfig.RedisUrl = redisUrl + + config := coordConfig + config.MyUrl = "test" + redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl) + Require(t, err) + coordinator := &SeqCoordinator{ + RedisCoordinator: *redisCoordinator, + config: config, + signer: nullSigner, + } + + // Add messages to redis + var keys []string + msgBytes, err := coordinator.msgCountToSignedBytes(0) + Require(t, err) + for i := arbutil.MessageIndex(1); i <= 10; i++ { + err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err() + Require(t, err) + err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err() + Require(t, err) + keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i)) + } + // Set msgCount key + msgCountBytes, err := coordinator.msgCountToSignedBytes(11) + Require(t, err) + err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err() + Require(t, err) + exists, err := coordinator.Client.Exists(ctx, keys...).Result() + Require(t, err) + if exists != 20 { + t.Fatal("couldn't find all messages and signatures in redis") + } + + // Set finalizedMsgCount and delete finalized messages + err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5) + Require(t, err) + + // Check if messages and signatures were deleted successfully + exists, err = coordinator.Client.Exists(ctx, keys[:10]...).Result() + Require(t, err) + if exists != 0 { + t.Fatal("finalized messages and signatures in range 1 to 5 were not deleted") + } + + // Check if finalizedMsgCount was set to correct value + finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx) + Require(t, err) + if finalized != 5 { + t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized) + } + + // Try deleting finalized messages when theres already a finalizedMsgCount + err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7) + Require(t, err) + exists, err = coordinator.Client.Exists(ctx, keys[10:14]...).Result() + Require(t, err) + if exists != 0 { + t.Fatal("finalized messages and signatures in range 6 to 7 were not deleted") + } + finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx) + Require(t, err) + if finalized != 7 { + t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized) + } +} From e6cd9a4c199d43a95ab6c06134697073d6914800 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 16 Jul 2024 11:18:35 -0500 Subject: [PATCH 4/8] minor bug fix- finalizedMsgCount should be non-inclusive --- arbnode/seq_coordinator.go | 8 ++++---- arbnode/seq_coordinator_test.go | 15 +++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 53d430e584..0e1344cb08 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -487,7 +487,7 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin finalized, err := c.sync.GetFinalizedMsgCount(ctx) if err != nil { log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) - } else if finalized != 0 { + } else if finalized == 0 { log.Warn("SyncMonitor returned zero finalizedMessageCount") } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) @@ -525,7 +525,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx) if errors.Is(err, redis.Nil) { var keys []string - for msg := finalized; msg > 0; msg-- { + for msg := finalized - 1; msg > 0; msg-- { exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() if exists == 0 || err != nil { break @@ -547,10 +547,10 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final if err != nil { return fmt.Errorf("cannot get remote message count: %w", err) } - msgToDelete := min(finalized, remoteMsgCount-1) + msgToDelete := min(finalized, remoteMsgCount) if prevFinalized < msgToDelete { var keys []string - for msg := prevFinalized + 1; msg <= msgToDelete; msg++ { + for msg := prevFinalized; msg < msgToDelete; msg++ { keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) } if err := c.Client.Del(ctx, keys...).Err(); err != nil { diff --git a/arbnode/seq_coordinator_test.go b/arbnode/seq_coordinator_test.go index 64feeff29c..6498543f3a 100644 --- a/arbnode/seq_coordinator_test.go +++ b/arbnode/seq_coordinator_test.go @@ -213,10 +213,10 @@ func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) { Require(t, err) // Check if messages and signatures were deleted successfully - exists, err = coordinator.Client.Exists(ctx, keys[:10]...).Result() + exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result() Require(t, err) if exists != 0 { - t.Fatal("finalized messages and signatures in range 1 to 5 were not deleted") + t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted") } // Check if finalizedMsgCount was set to correct value @@ -229,14 +229,21 @@ func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) { // Try deleting finalized messages when theres already a finalizedMsgCount err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7) Require(t, err) - exists, err = coordinator.Client.Exists(ctx, keys[10:14]...).Result() + exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result() Require(t, err) if exists != 0 { - t.Fatal("finalized messages and signatures in range 6 to 7 were not deleted") + t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted") } finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx) Require(t, err) if finalized != 7 { t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized) } + + // Check that non-finalized messages are still available in redis + exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result() + Require(t, err) + if exists != 8 { + t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available") + } } From 08c7e9e67df4c719077526c31c901d667541b3b3 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 31 Jul 2024 11:43:32 +0530 Subject: [PATCH 5/8] code refactor --- arbnode/seq_coordinator.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 0e1344cb08..bc2ef93991 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -512,7 +512,12 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin } func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { - updateFinalizedMsgCount := func() error { + deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error { + if len(keys) > 0 { + if err := c.Client.Del(ctx, keys...).Err(); err != nil { + return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + } + } finalizedBytes, err := c.msgCountToSignedBytes(finalized) if err != nil { return err @@ -527,19 +532,17 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final var keys []string for msg := finalized - 1; msg > 0; msg-- { exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result() - if exists == 0 || err != nil { + if err != nil { + // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another + return err + } + if exists == 0 { break } keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) } - // If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another - if len(keys) > 0 { - log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) - if err := c.Client.Del(ctx, keys...).Err(); err != nil { - return fmt.Errorf("error deleting finalized message and their signatures from redis during init of finalizedMsgCount: %w", err) - } - } - return updateFinalizedMsgCount() + log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized) + return deleteMsgsAndUpdateFinalizedMsgCount(keys) } else if err != nil { return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err) } @@ -553,10 +556,7 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final for msg := prevFinalized; msg < msgToDelete; msg++ { keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)) } - if err := c.Client.Del(ctx, keys...).Err(); err != nil { - return fmt.Errorf("error deleting finalized message and their signatures from redis: %w", err) - } - return updateFinalizedMsgCount() + return deleteMsgsAndUpdateFinalizedMsgCount(keys) } return nil } From 379bf7e50ee71b203dae71dd12273062ba6f3dcc Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 1 Aug 2024 21:08:37 +0530 Subject: [PATCH 6/8] minor fix --- arbnode/seq_coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index bc2ef93991..dad271c168 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -604,7 +604,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error - for msgToRead < readUntil && localMsgCount > remoteFinalizedMsgCount { + for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { var resString string resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() if msgReadErr != nil { From 4848a05a60bf0a70e5e63908dab1bc7f67e90977 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 8 Aug 2024 16:37:09 +0530 Subject: [PATCH 7/8] address PR comments --- arbnode/seq_coordinator.go | 70 +++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index dad271c168..e3eb9f1df1 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -70,9 +70,10 @@ type SeqCoordinatorConfig struct { SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"` ReleaseRetries int `koanf:"release-retries"` // Max message per poll. - MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` - MyUrl string `koanf:"my-url"` - Signer signature.SignVerifyConfig `koanf:"signer"` + MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"` + MyUrl string `koanf:"my-url"` + DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"` + Signer signature.SignVerifyConfig `koanf:"signer"` } func (c *SeqCoordinatorConfig) Url() string { @@ -96,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown") f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind") f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen") + f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis") signature.SignVerifyConfigAddOptions(prefix+".signer", f) } @@ -113,23 +115,25 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{ RetryInterval: 50 * time.Millisecond, MsgPerPoll: 2000, MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, Signer: signature.DefaultSignVerifyConfig, } var TestSeqCoordinatorConfig = SeqCoordinatorConfig{ - Enable: false, - RedisUrl: "", - LockoutDuration: time.Second * 2, - LockoutSpare: time.Millisecond * 10, - SeqNumDuration: time.Minute * 10, - UpdateInterval: time.Millisecond * 10, - HandoffTimeout: time.Millisecond * 200, - SafeShutdownDelay: time.Millisecond * 100, - ReleaseRetries: 4, - RetryInterval: time.Millisecond * 3, - MsgPerPoll: 20, - MyUrl: redisutil.INVALID_URL, - Signer: signature.DefaultSignVerifyConfig, + Enable: false, + RedisUrl: "", + LockoutDuration: time.Second * 2, + LockoutSpare: time.Millisecond * 10, + SeqNumDuration: time.Minute * 10, + UpdateInterval: time.Millisecond * 10, + HandoffTimeout: time.Millisecond * 200, + SafeShutdownDelay: time.Millisecond * 100, + ReleaseRetries: 4, + RetryInterval: time.Millisecond * 3, + MsgPerPoll: 20, + MyUrl: redisutil.INVALID_URL, + DeleteFinalizedMsgs: true, + Signer: signature.DefaultSignVerifyConfig, } func NewSeqCoordinator( @@ -483,14 +487,16 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin return c.noRedisError() } // Was, and still is, the active sequencer - // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key - finalized, err := c.sync.GetFinalizedMsgCount(ctx) - if err != nil { - log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) - } else if finalized == 0 { - log.Warn("SyncMonitor returned zero finalizedMessageCount") - } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { - log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + if c.config.DeleteFinalizedMsgs { + // Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key + finalized, err := c.sync.GetFinalizedMsgCount(ctx) + if err != nil { + log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err) + } else if finalized == 0 { + log.Warn("SyncMonitor returned zero finalizedMessageCount") + } else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil { + log.Warn("Coordinator failed to delete finalized messages from redis", "err", err) + } } // We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater. marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5) @@ -514,8 +520,14 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error { deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error { if len(keys) > 0 { - if err := c.Client.Del(ctx, keys...).Err(); err != nil { - return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + // To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations + // next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower. + // In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized + batchDeleteCount := 1000 + for i := len(keys); i > 0; i -= batchDeleteCount { + if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil { + return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err) + } } } finalizedBytes, err := c.msgCountToSignedBytes(finalized) @@ -593,7 +605,11 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) if err != nil { - log.Warn("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) + loglevel := log.Error + if err == redis.Nil { + loglevel = log.Debug + } + loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err) } remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { From 45eaee937046d45ee52334f879ec66055c4d585c Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 8 Aug 2024 16:39:33 +0530 Subject: [PATCH 8/8] fix lint --- arbnode/seq_coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index e3eb9f1df1..a582b64ffa 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -606,7 +606,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx) if err != nil { loglevel := log.Error - if err == redis.Nil { + if errors.Is(err, redis.Nil) { loglevel = log.Debug } loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)