From 79420e9ecc81eaafac1af632d4d2d0c6fb9f7287 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Wed, 16 Dec 2020 14:46:56 -0600 Subject: [PATCH 1/3] Fixed leaky bucket ResetTime invalid on first request --- algorithms.go | 2 +- functional_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/algorithms.go b/algorithms.go index f1a07064..5d321886 100644 --- a/algorithms.go +++ b/algorithms.go @@ -313,7 +313,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er Status: Status_UNDER_LIMIT, Limit: r.Limit, Remaining: r.Limit - r.Hits, - ResetTime: duration / r.Limit, + ResetTime: now + duration / r.Limit, } // Client could be requesting that we start with the bucket OVER_LIMIT diff --git a/functional_test.go b/functional_test.go index cb2d6e21..e293fc04 100644 --- a/functional_test.go +++ b/functional_test.go @@ -191,6 +191,7 @@ func TestLeakyBucket(t *testing.T) { }, } + now := clock.Now() for i, test := range tests { resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ @@ -212,7 +213,7 @@ func TestLeakyBucket(t *testing.T) { assert.Equal(t, test.Status, rl.Status, i) assert.Equal(t, test.Remaining, rl.Remaining, i) assert.Equal(t, int64(5), rl.Limit, i) - assert.True(t, rl.ResetTime != 0) + assert.True(t, rl.ResetTime > now.Unix()) clock.Advance(test.Sleep) } } From 9143dd31185922b8342a6c6cc02a99ad011d887b Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 18 Dec 2020 14:01:39 -0600 Subject: [PATCH 2/3] Fix leaky bucket algo as reported in #77 --- algorithms.go | 39 ++++---- functional_test.go | 229 +++++++++++++++++++++++++++++++++++++++------ store.go | 2 +- store_test.go | 2 +- 4 files changed, 223 insertions(+), 49 deletions(-) diff --git a/algorithms.go b/algorithms.go index 5d321886..1b0dfbdd 100644 --- a/algorithms.go +++ b/algorithms.go @@ -204,7 +204,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er } if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) { - b.Remaining = r.Limit + b.Remaining = float64(r.Limit) } // Update limit and duration if they changed @@ -231,18 +231,22 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er duration = expire - (n.UnixNano() / 1000000) } - // Calculate how much leaked out of the bucket since the last hit + // Calculate how much leaked out of the bucket since the last time we leaked a hit elapsed := now - b.UpdatedAt - leak := int64(float64(elapsed) / rate) + leak := float64(elapsed) / rate - b.Remaining += leak - if b.Remaining > b.Limit { - b.Remaining = b.Limit + if int64(b.Remaining) > b.Limit { + b.Remaining = float64(b.Limit) + } + + if int64(leak) > 0 { + b.Remaining += leak + b.UpdatedAt = now } rl := &RateLimitResp{ Limit: b.Limit, - Remaining: b.Remaining, + Remaining: int64(b.Remaining), Status: Status_UNDER_LIMIT, ResetTime: now + int64(rate), } @@ -254,26 +258,21 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er } // If we are already at the limit - if b.Remaining == 0 { + if int64(b.Remaining) == 0 { rl.Status = Status_OVER_LIMIT return rl, nil } - // Only update the timestamp if client is incrementing the hit - if r.Hits != 0 { - b.UpdatedAt = now - } - // If requested hits takes the remainder - if b.Remaining == r.Hits { - b.Remaining = 0 + if int64(b.Remaining) == r.Hits { + b.Remaining -= float64(r.Hits) rl.Remaining = 0 return rl, nil } // If requested is more than available, then return over the limit // without updating the bucket. - if r.Hits > b.Remaining { + if r.Hits > int64(b.Remaining) { rl.Status = Status_OVER_LIMIT return rl, nil } @@ -283,8 +282,8 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er return rl, nil } - b.Remaining -= r.Hits - rl.Remaining = b.Remaining + b.Remaining -= float64(r.Hits) + rl.Remaining = int64(b.Remaining) c.UpdateExpiration(r.HashKey(), now*duration) return rl, nil } @@ -303,7 +302,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er // Create a new leaky bucket b := LeakyBucketItem{ - Remaining: r.Limit - r.Hits, + Remaining: float64(r.Limit - r.Hits), Limit: r.Limit, Duration: duration, UpdatedAt: now, @@ -313,7 +312,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er Status: Status_UNDER_LIMIT, Limit: r.Limit, Remaining: r.Limit - r.Hits, - ResetTime: now + duration / r.Limit, + ResetTime: now + duration/r.Limit, } // Client could be requesting that we start with the bucket OVER_LIMIT diff --git a/functional_test.go b/functional_test.go index e293fc04..bf18235c 100644 --- a/functional_test.go +++ b/functional_test.go @@ -118,7 +118,7 @@ func TestTokenBucket(t *testing.T) { { Remaining: 0, Status: guber.Status_UNDER_LIMIT, - Sleep: clock.Duration(clock.Millisecond * 100), + Sleep: clock.Millisecond * 100, }, { Remaining: 1, @@ -153,6 +153,81 @@ func TestTokenBucket(t *testing.T) { } } +func TestTokenBucketGregorian(t *testing.T) { + defer clock.Freeze(clock.Now()).Unfreeze() + + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + require.Nil(t, errs) + + tests := []struct { + Name string + Remaining int64 + Status guber.Status + Sleep clock.Duration + Hits int64 + }{ + { + Name: "first hit", + Hits: 1, + Remaining: 59, + Status: guber.Status_UNDER_LIMIT, + }, + { + Name: "second hit", + Hits: 1, + Remaining: 58, + Status: guber.Status_UNDER_LIMIT, + }, + { + Name: "consume remaining hits", + Hits: 58, + Remaining: 0, + Status: guber.Status_UNDER_LIMIT, + }, + { + Name: "should be over the limit", + Hits: 1, + Remaining: 0, + Status: guber.Status_OVER_LIMIT, + Sleep: clock.Second * 61, + }, + { + Name: "should be under the limit", + Hits: 0, + Remaining: 60, + Status: guber.Status_UNDER_LIMIT, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: "test_token_bucket_greg", + UniqueKey: "account:12345", + Behavior: guber.Behavior_DURATION_IS_GREGORIAN, + Algorithm: guber.Algorithm_TOKEN_BUCKET, + Duration: guber.GregorianMinutes, + Hits: test.Hits, + Limit: 60, + }, + }, + }) + require.Nil(t, err) + + rl := resp.Responses[0] + + assert.Empty(t, rl.Error) + assert.Equal(t, test.Status, rl.Status) + assert.Equal(t, test.Remaining, rl.Remaining) + assert.Equal(t, int64(60), rl.Limit) + assert.True(t, rl.ResetTime != 0) + clock.Advance(test.Sleep) + }) + } +} + func TestLeakyBucket(t *testing.T) { defer clock.Freeze(clock.Now()).Unfreeze() @@ -160,61 +235,161 @@ func TestLeakyBucket(t *testing.T) { require.Nil(t, errs) tests := []struct { + Name string Hits int64 Remaining int64 Status guber.Status Sleep clock.Duration }{ { - Hits: 5, + Name: "first hit", + Hits: 1, + Remaining: 9, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Second, + }, + { + Name: "second hit; no leak", + Hits: 1, + Remaining: 8, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Second, + }, + { + Name: "third hit; no leak", + Hits: 1, + Remaining: 7, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Millisecond * 1500, + }, + { + Name: "should leak one hit 3 seconds after first hit", + Hits: 0, + Remaining: 8, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Second * 3, + }, + { + Name: "3 Seconds later we should have leaked another hit", + Hits: 0, + Remaining: 9, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Duration(0), + }, + { + Name: "max out our bucket and sleep for 3 seconds", + Hits: 9, Remaining: 0, Status: guber.Status_UNDER_LIMIT, Sleep: clock.Duration(0), }, { + Name: "should be over the limit", Hits: 1, Remaining: 0, Status: guber.Status_OVER_LIMIT, - Sleep: clock.Millisecond * 100, + Sleep: clock.Second * 3, }, { + Name: "should have leaked 1 hit", + Hits: 0, + Remaining: 1, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Second, + }, + } + + now := clock.Now() + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: "test_leaky_bucket", + UniqueKey: "account:1234", + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Duration: guber.Second * 30, + Hits: test.Hits, + Limit: 10, + }, + }, + }) + clock.Freeze(clock.Now()) + require.NoError(t, err) + + rl := resp.Responses[0] + + assert.Equal(t, test.Status, rl.Status) + assert.Equal(t, test.Remaining, rl.Remaining) + assert.Equal(t, int64(10), rl.Limit) + assert.True(t, rl.ResetTime > now.Unix()) + clock.Advance(test.Sleep) + }) + } +} + +func TestLeakyBucketGregorian(t *testing.T) { + defer clock.Freeze(clock.Now()).Unfreeze() + + client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil) + require.Nil(t, errs) + + tests := []struct { + Name string + Hits int64 + Remaining int64 + Status guber.Status + Sleep clock.Duration + }{ + { + Name: "first hit", Hits: 1, - Remaining: 0, + Remaining: 59, Status: guber.Status_UNDER_LIMIT, - Sleep: clock.Millisecond * 400, + Sleep: clock.Millisecond * 500, }, { + Name: "second hit; no leak", Hits: 1, - Remaining: 4, + Remaining: 58, + Status: guber.Status_UNDER_LIMIT, + Sleep: clock.Second, + }, + { + Name: "third hit; leak one hit", + Hits: 1, + Remaining: 58, Status: guber.Status_UNDER_LIMIT, - Sleep: clock.Duration(0), }, } now := clock.Now() - for i, test := range tests { - resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: "test_leaky_bucket", - UniqueKey: "account:1234", - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Duration: guber.Millisecond * 300, - Hits: test.Hits, - Limit: 5, + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: "test_leaky_bucket_greg", + UniqueKey: "account:12345", + Behavior: guber.Behavior_DURATION_IS_GREGORIAN, + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Duration: guber.GregorianMinutes, + Hits: test.Hits, + Limit: 60, + }, }, - }, - }) - clock.Freeze(clock.Now()) - require.NoError(t, err) + }) + clock.Freeze(clock.Now()) + require.NoError(t, err) - rl := resp.Responses[0] + rl := resp.Responses[0] - assert.Equal(t, test.Status, rl.Status, i) - assert.Equal(t, test.Remaining, rl.Remaining, i) - assert.Equal(t, int64(5), rl.Limit, i) - assert.True(t, rl.ResetTime > now.Unix()) - clock.Advance(test.Sleep) + assert.Equal(t, test.Status, rl.Status) + assert.Equal(t, test.Remaining, rl.Remaining) + assert.Equal(t, int64(60), rl.Limit) + assert.True(t, rl.ResetTime > now.Unix()) + clock.Advance(test.Sleep) + }) } } diff --git a/store.go b/store.go index 5bfd2b7f..4f428a66 100644 --- a/store.go +++ b/store.go @@ -11,7 +11,7 @@ package gubernator type LeakyBucketItem struct { Limit int64 Duration int64 - Remaining int64 + Remaining float64 UpdatedAt int64 } diff --git a/store_test.go b/store_test.go index d6339e6a..152a7df6 100644 --- a/store_test.go +++ b/store_test.go @@ -291,7 +291,7 @@ func getRemaining(item *gubernator.CacheItem) int64 { case gubernator.Algorithm_TOKEN_BUCKET: return item.Value.(*gubernator.TokenBucketItem).Remaining case gubernator.Algorithm_LEAKY_BUCKET: - return item.Value.(*gubernator.LeakyBucketItem).Remaining + return int64(item.Value.(*gubernator.LeakyBucketItem).Remaining) } return 0 } From f3cccc28c73155ae86b5c5d8177b7c2d83c4fe6a Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 18 Dec 2020 14:23:02 -0600 Subject: [PATCH 3/3] Fixed race condition on peer shutdown --- peer_client.go | 7 +++++-- version | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/peer_client.go b/peer_client.go index faed58e5..a95c9964 100644 --- a/peer_client.go +++ b/peer_client.go @@ -239,13 +239,16 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, err } + // See NOTE above about RLock and wg.Add(1) + c.mutex.RLock() + if c.status == peerClosing { + return nil, &PeerErr{err: errors.New("already disconnecting")} + } req := request{request: r, resp: make(chan *response, 1)} // Enqueue the request to be sent c.queue <- &req - // See NOTE above about RLock and wg.Add(1) - c.mutex.RLock() c.wg.Add(1) defer func() { c.mutex.RUnlock() diff --git a/version b/version index 3c029ddf..d17f0653 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.0.0-rc.3 +1.0.0-rc.4