Skip to content

Commit

Permalink
Merge branch 'master' into db-conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee authored Aug 16, 2024
2 parents bc8803a + 5d600df commit 93aaaef
Show file tree
Hide file tree
Showing 15 changed files with 1,365 additions and 95 deletions.
9 changes: 5 additions & 4 deletions arbitrator/wasm-libraries/user-host-trait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ pub trait UserHost<DR: DataReader>: GasMeteredMachine {
fn return_data_size(&mut self) -> Result<u32, Self::Err> {
self.buy_ink(HOSTIO_INK)?;
let len = *self.evm_return_data_len();
trace!("return_data_size", self, be!(len), &[], len)
trace!("return_data_size", self, &[], be!(len), len)
}

/// Emits an EVM log with the given number of topics and data, the first bytes of which should
Expand Down Expand Up @@ -629,7 +629,8 @@ pub trait UserHost<DR: DataReader>: GasMeteredMachine {
self.buy_gas(gas_cost)?;

let code = code.slice();
trace!("account_code_size", self, address, &[], code.len() as u32)
let len = code.len() as u32;
trace!("account_code_size", self, address, be!(len), len)
}

/// Gets the code hash of the account at the given address. The semantics are equivalent
Expand Down Expand Up @@ -735,7 +736,7 @@ pub trait UserHost<DR: DataReader>: GasMeteredMachine {
fn evm_gas_left(&mut self) -> Result<u64, Self::Err> {
self.buy_ink(HOSTIO_INK)?;
let gas = self.gas_left()?;
trace!("evm_gas_left", self, be!(gas), &[], gas)
trace!("evm_gas_left", self, &[], be!(gas), gas)
}

/// Gets the amount of ink remaining after paying for the cost of this hostio. The semantics
Expand All @@ -747,7 +748,7 @@ pub trait UserHost<DR: DataReader>: GasMeteredMachine {
fn evm_ink_left(&mut self) -> Result<u64, Self::Err> {
self.buy_ink(HOSTIO_INK)?;
let ink = self.ink_ready()?;
trace!("evm_ink_left", self, be!(ink), &[], ink)
trace!("evm_ink_left", self, &[], be!(ink), ink)
}

/// Computes `value ÷ exponent` using 256-bit math, writing the result to the first.
Expand Down
130 changes: 108 additions & 22 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -69,9 +70,10 @@ type SeqCoordinatorConfig struct {
SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"`
ReleaseRetries int `koanf:"release-retries"`
// Max message per poll.
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
Signer signature.SignVerifyConfig `koanf:"signer"`
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"`
Signer signature.SignVerifyConfig `koanf:"signer"`
}

func (c *SeqCoordinatorConfig) Url() string {
Expand All @@ -95,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown")
f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind")
f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen")
f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis")
signature.SignVerifyConfigAddOptions(prefix+".signer", f)
}

Expand All @@ -104,31 +107,33 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
ReleaseRetries: 4,
RetryInterval: 50 * time.Millisecond,
MsgPerPoll: 2000,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

var TestSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
Signer: signature.DefaultSignVerifyConfig,
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

func NewSeqCoordinator(
Expand All @@ -149,6 +154,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +344,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +487,17 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
if c.config.DeleteFinalizedMsgs {
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +517,62 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error {
if len(keys) > 0 {
// To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations
// next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower.
// In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized
batchDeleteCount := 1000
for i := len(keys); i > 0; i -= batchDeleteCount {
if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
}
}
}
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized - 1; msg > 0; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if err != nil {
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
return err
}
if exists == 0 {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client)
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized; msg < msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,19 +603,24 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
loglevel := log.Error
if errors.Is(err, redis.Nil) {
loglevel = log.Debug
}
loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,94 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
}

}

func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coordConfig := TestSeqCoordinatorConfig
coordConfig.LockoutDuration = time.Millisecond * 100
coordConfig.LockoutSpare = time.Millisecond * 10
coordConfig.Signer.ECDSA.AcceptSequencer = false
coordConfig.Signer.SymmetricFallback = true
coordConfig.Signer.SymmetricSign = true
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""

nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

redisUrl := redisutil.CreateTestRedis(ctx, t)
coordConfig.RedisUrl = redisUrl

config := coordConfig
config.MyUrl = "test"
redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl)
Require(t, err)
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
config: config,
signer: nullSigner,
}

// Add messages to redis
var keys []string
msgBytes, err := coordinator.msgCountToSignedBytes(0)
Require(t, err)
for i := arbutil.MessageIndex(1); i <= 10; i++ {
err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i))
}
// Set msgCount key
msgCountBytes, err := coordinator.msgCountToSignedBytes(11)
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err()
Require(t, err)
exists, err := coordinator.Client.Exists(ctx, keys...).Result()
Require(t, err)
if exists != 20 {
t.Fatal("couldn't find all messages and signatures in redis")
}

// Set finalizedMsgCount and delete finalized messages
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5)
Require(t, err)

// Check if messages and signatures were deleted successfully
exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted")
}

// Check if finalizedMsgCount was set to correct value
finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 5 {
t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized)
}

// Try deleting finalized messages when theres already a finalizedMsgCount
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7)
Require(t, err)
exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted")
}
finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 7 {
t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized)
}

// Check that non-finalized messages are still available in redis
exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result()
Require(t, err)
if exists != 8 {
t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available")
}
}
7 changes: 7 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
Loading

0 comments on commit 93aaaef

Please sign in to comment.