Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix leaky bucket algo as reported in #77
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Dec 18, 2020
1 parent 79420e9 commit 9143dd3
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 49 deletions.
39 changes: 19 additions & 20 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
b.Remaining = r.Limit
b.Remaining = float64(r.Limit)
}

// Update limit and duration if they changed
Expand All @@ -231,18 +231,22 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
duration = expire - (n.UnixNano() / 1000000)
}

// Calculate how much leaked out of the bucket since the last hit
// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := now - b.UpdatedAt
leak := int64(float64(elapsed) / rate)
leak := float64(elapsed) / rate

b.Remaining += leak
if b.Remaining > b.Limit {
b.Remaining = b.Limit
if int64(b.Remaining) > b.Limit {
b.Remaining = float64(b.Limit)
}

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = now
}

rl := &RateLimitResp{
Limit: b.Limit,
Remaining: b.Remaining,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + int64(rate),
}
Expand All @@ -254,26 +258,21 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

// If we are already at the limit
if b.Remaining == 0 {
if int64(b.Remaining) == 0 {
rl.Status = Status_OVER_LIMIT
return rl, nil
}

// Only update the timestamp if client is incrementing the hit
if r.Hits != 0 {
b.UpdatedAt = now
}

// If requested hits takes the remainder
if b.Remaining == r.Hits {
b.Remaining = 0
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > b.Remaining {
if r.Hits > int64(b.Remaining) {
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand All @@ -283,8 +282,8 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
return rl, nil
}

b.Remaining -= r.Hits
rl.Remaining = b.Remaining
b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
c.UpdateExpiration(r.HashKey(), now*duration)
return rl, nil
}
Expand All @@ -303,7 +302,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er

// Create a new leaky bucket
b := LeakyBucketItem{
Remaining: r.Limit - r.Hits,
Remaining: float64(r.Limit - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: now,
Expand All @@ -313,7 +312,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: r.Limit - r.Hits,
ResetTime: now + duration / r.Limit,
ResetTime: now + duration/r.Limit,
}

// Client could be requesting that we start with the bucket OVER_LIMIT
Expand Down
229 changes: 202 additions & 27 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestTokenBucket(t *testing.T) {
{
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(clock.Millisecond * 100),
Sleep: clock.Millisecond * 100,
},
{
Remaining: 1,
Expand Down Expand Up @@ -153,68 +153,243 @@ func TestTokenBucket(t *testing.T) {
}
}

func TestTokenBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Remaining int64
Status guber.Status
Sleep clock.Duration
Hits int64
}{
{
Name: "first hit",
Hits: 1,
Remaining: 59,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "second hit",
Hits: 1,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "consume remaining hits",
Hits: 58,
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "should be over the limit",
Hits: 1,
Remaining: 0,
Status: guber.Status_OVER_LIMIT,
Sleep: clock.Second * 61,
},
{
Name: "should be under the limit",
Hits: 0,
Remaining: 60,
Status: guber.Status_UNDER_LIMIT,
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_token_bucket_greg",
UniqueKey: "account:12345",
Behavior: guber.Behavior_DURATION_IS_GREGORIAN,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.GregorianMinutes,
Hits: test.Hits,
Limit: 60,
},
},
})
require.Nil(t, err)

rl := resp.Responses[0]

assert.Empty(t, rl.Error)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(60), rl.Limit)
assert.True(t, rl.ResetTime != 0)
clock.Advance(test.Sleep)
})
}
}

func TestLeakyBucket(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Hits int64
Remaining int64
Status guber.Status
Sleep clock.Duration
}{
{
Hits: 5,
Name: "first hit",
Hits: 1,
Remaining: 9,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "second hit; no leak",
Hits: 1,
Remaining: 8,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "third hit; no leak",
Hits: 1,
Remaining: 7,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Millisecond * 1500,
},
{
Name: "should leak one hit 3 seconds after first hit",
Hits: 0,
Remaining: 8,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second * 3,
},
{
Name: "3 Seconds later we should have leaked another hit",
Hits: 0,
Remaining: 9,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
{
Name: "max out our bucket and sleep for 3 seconds",
Hits: 9,
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
{
Name: "should be over the limit",
Hits: 1,
Remaining: 0,
Status: guber.Status_OVER_LIMIT,
Sleep: clock.Millisecond * 100,
Sleep: clock.Second * 3,
},
{
Name: "should have leaked 1 hit",
Hits: 0,
Remaining: 1,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
}

now := clock.Now()
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 30,
Hits: test.Hits,
Limit: 10,
},
},
})
clock.Freeze(clock.Now())
require.NoError(t, err)

rl := resp.Responses[0]

assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)
assert.True(t, rl.ResetTime > now.Unix())
clock.Advance(test.Sleep)
})
}
}

func TestLeakyBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Hits int64
Remaining int64
Status guber.Status
Sleep clock.Duration
}{
{
Name: "first hit",
Hits: 1,
Remaining: 0,
Remaining: 59,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Millisecond * 400,
Sleep: clock.Millisecond * 500,
},
{
Name: "second hit; no leak",
Hits: 1,
Remaining: 4,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "third hit; leak one hit",
Hits: 1,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
}

now := clock.Now()
for i, test := range tests {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Millisecond * 300,
Hits: test.Hits,
Limit: 5,
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket_greg",
UniqueKey: "account:12345",
Behavior: guber.Behavior_DURATION_IS_GREGORIAN,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.GregorianMinutes,
Hits: test.Hits,
Limit: 60,
},
},
},
})
clock.Freeze(clock.Now())
require.NoError(t, err)
})
clock.Freeze(clock.Now())
require.NoError(t, err)

rl := resp.Responses[0]
rl := resp.Responses[0]

assert.Equal(t, test.Status, rl.Status, i)
assert.Equal(t, test.Remaining, rl.Remaining, i)
assert.Equal(t, int64(5), rl.Limit, i)
assert.True(t, rl.ResetTime > now.Unix())
clock.Advance(test.Sleep)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(60), rl.Limit)
assert.True(t, rl.ResetTime > now.Unix())
clock.Advance(test.Sleep)
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package gubernator
type LeakyBucketItem struct {
Limit int64
Duration int64
Remaining int64
Remaining float64
UpdatedAt int64
}

Expand Down
2 changes: 1 addition & 1 deletion store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func getRemaining(item *gubernator.CacheItem) int64 {
case gubernator.Algorithm_TOKEN_BUCKET:
return item.Value.(*gubernator.TokenBucketItem).Remaining
case gubernator.Algorithm_LEAKY_BUCKET:
return item.Value.(*gubernator.LeakyBucketItem).Remaining
return int64(item.Value.(*gubernator.LeakyBucketItem).Remaining)
}
return 0
}

0 comments on commit 9143dd3

Please sign in to comment.