Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #80 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
PIP-1011: Fix leaky bucket algorithm
  • Loading branch information
thrawn01 authored Dec 19, 2020
2 parents 4d87ed8 + f3cccc2 commit 6f37fde
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 52 deletions.
39 changes: 19 additions & 20 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
b.Remaining = r.Limit
b.Remaining = float64(r.Limit)
}

// Update limit and duration if they changed
Expand All @@ -231,18 +231,22 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
duration = expire - (n.UnixNano() / 1000000)
}

// Calculate how much leaked out of the bucket since the last hit
// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := now - b.UpdatedAt
leak := int64(float64(elapsed) / rate)
leak := float64(elapsed) / rate

b.Remaining += leak
if b.Remaining > b.Limit {
b.Remaining = b.Limit
if int64(b.Remaining) > b.Limit {
b.Remaining = float64(b.Limit)
}

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = now
}

rl := &RateLimitResp{
Limit: b.Limit,
Remaining: b.Remaining,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + int64(rate),
}
Expand All @@ -254,26 +258,21 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

// If we are already at the limit
if b.Remaining == 0 {
if int64(b.Remaining) == 0 {
rl.Status = Status_OVER_LIMIT
return rl, nil
}

// Only update the timestamp if client is incrementing the hit
if r.Hits != 0 {
b.UpdatedAt = now
}

// If requested hits takes the remainder
if b.Remaining == r.Hits {
b.Remaining = 0
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > b.Remaining {
if r.Hits > int64(b.Remaining) {
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand All @@ -283,8 +282,8 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
return rl, nil
}

b.Remaining -= r.Hits
rl.Remaining = b.Remaining
b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
c.UpdateExpiration(r.HashKey(), now*duration)
return rl, nil
}
Expand All @@ -303,7 +302,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er

// Create a new leaky bucket
b := LeakyBucketItem{
Remaining: r.Limit - r.Hits,
Remaining: float64(r.Limit - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: now,
Expand All @@ -313,7 +312,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: r.Limit - r.Hits,
ResetTime: duration / r.Limit,
ResetTime: now + duration/r.Limit,
}

// Client could be requesting that we start with the bucket OVER_LIMIT
Expand Down
230 changes: 203 additions & 27 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestTokenBucket(t *testing.T) {
{
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(clock.Millisecond * 100),
Sleep: clock.Millisecond * 100,
},
{
Remaining: 1,
Expand Down Expand Up @@ -153,67 +153,243 @@ func TestTokenBucket(t *testing.T) {
}
}

func TestTokenBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Remaining int64
Status guber.Status
Sleep clock.Duration
Hits int64
}{
{
Name: "first hit",
Hits: 1,
Remaining: 59,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "second hit",
Hits: 1,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "consume remaining hits",
Hits: 58,
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
},
{
Name: "should be over the limit",
Hits: 1,
Remaining: 0,
Status: guber.Status_OVER_LIMIT,
Sleep: clock.Second * 61,
},
{
Name: "should be under the limit",
Hits: 0,
Remaining: 60,
Status: guber.Status_UNDER_LIMIT,
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_token_bucket_greg",
UniqueKey: "account:12345",
Behavior: guber.Behavior_DURATION_IS_GREGORIAN,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.GregorianMinutes,
Hits: test.Hits,
Limit: 60,
},
},
})
require.Nil(t, err)

rl := resp.Responses[0]

assert.Empty(t, rl.Error)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(60), rl.Limit)
assert.True(t, rl.ResetTime != 0)
clock.Advance(test.Sleep)
})
}
}

func TestLeakyBucket(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Hits int64
Remaining int64
Status guber.Status
Sleep clock.Duration
}{
{
Hits: 5,
Name: "first hit",
Hits: 1,
Remaining: 9,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "second hit; no leak",
Hits: 1,
Remaining: 8,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "third hit; no leak",
Hits: 1,
Remaining: 7,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Millisecond * 1500,
},
{
Name: "should leak one hit 3 seconds after first hit",
Hits: 0,
Remaining: 8,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second * 3,
},
{
Name: "3 Seconds later we should have leaked another hit",
Hits: 0,
Remaining: 9,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
{
Name: "max out our bucket and sleep for 3 seconds",
Hits: 9,
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
{
Name: "should be over the limit",
Hits: 1,
Remaining: 0,
Status: guber.Status_OVER_LIMIT,
Sleep: clock.Millisecond * 100,
Sleep: clock.Second * 3,
},
{
Name: "should have leaked 1 hit",
Hits: 0,
Remaining: 1,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
}

now := clock.Now()
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 30,
Hits: test.Hits,
Limit: 10,
},
},
})
clock.Freeze(clock.Now())
require.NoError(t, err)

rl := resp.Responses[0]

assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)
assert.True(t, rl.ResetTime > now.Unix())
clock.Advance(test.Sleep)
})
}
}

func TestLeakyBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Name string
Hits int64
Remaining int64
Status guber.Status
Sleep clock.Duration
}{
{
Name: "first hit",
Hits: 1,
Remaining: 0,
Remaining: 59,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Millisecond * 400,
Sleep: clock.Millisecond * 500,
},
{
Name: "second hit; no leak",
Hits: 1,
Remaining: 4,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Second,
},
{
Name: "third hit; leak one hit",
Hits: 1,
Remaining: 58,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
}

for i, test := range tests {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Millisecond * 300,
Hits: test.Hits,
Limit: 5,
now := clock.Now()
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_leaky_bucket_greg",
UniqueKey: "account:12345",
Behavior: guber.Behavior_DURATION_IS_GREGORIAN,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.GregorianMinutes,
Hits: test.Hits,
Limit: 60,
},
},
},
})
clock.Freeze(clock.Now())
require.NoError(t, err)
})
clock.Freeze(clock.Now())
require.NoError(t, err)

rl := resp.Responses[0]
rl := resp.Responses[0]

assert.Equal(t, test.Status, rl.Status, i)
assert.Equal(t, test.Remaining, rl.Remaining, i)
assert.Equal(t, int64(5), rl.Limit, i)
assert.True(t, rl.ResetTime != 0)
clock.Advance(test.Sleep)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(60), rl.Limit)
assert.True(t, rl.ResetTime > now.Unix())
clock.Advance(test.Sleep)
})
}
}

Expand Down
7 changes: 5 additions & 2 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,16 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, err
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
if c.status == peerClosing {
return nil, &PeerErr{err: errors.New("already disconnecting")}
}
req := request{request: r, resp: make(chan *response, 1)}

// Enqueue the request to be sent
c.queue <- &req

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
Expand Down
Loading

0 comments on commit 6f37fde

Please sign in to comment.