Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Fix global mode #218

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: go mod download

- name: Test
run: go test -v -race -p=1 -count=1
run: go test -v -race -p=1 -count=1 -tags holster_test_mode
go-bench:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint: $(GOLANGCI_LINT)

.PHONY: test
test:
(go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \
(go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \
go tool cover -func coverage.out; \
go tool cover -html coverage.out -o coverage.html; \
exit $$ret)
Expand Down
9 changes: 5 additions & 4 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,17 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// If requested is more than available, drain bucket in order to converge as everything is returning OVER_LIMIT.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
b.Remaining = 0
Copy link
Contributor

@elbuo8 elbuo8 Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed because the owner receives collapsed/accumulated hits, so if the remainder is 50, drip/leak brings it to 75 and the peers reported a total of 200 hits, it's fair to say that the quota has been consumed AND that the tokens be taken away given that drip was applied and we don't want to give more tokens away than we should (even if you were dripping exclusively on the owner).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great catch! However, this changes the semantics of asking for more than what is available. If we went forward with this change, any request that asks for more than is available will gobble up the remainder without actually using them.

For example: If my email limit is 1,000 and I've already sent 500 emails. Then I request to send 1,000 more emails, I would expect the 1,000 email request to be rejected as "over the limit". However, subsequent requests for 500 emails or less should be allowed, as I have 500 emails remaining within my rate limit window.

I think I have an idea of how to solve this. I'm adding some tests to cover this scenario and will have a PR soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had just merged in this behavior as a new behavior type DRAIN_OVER_LIMIT. I faced the same usability issue, but I did not want to break legacy applications that may have depended on old logic.
#209

rl.Remaining = int64(b.Remaining)
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand Down
9 changes: 9 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// FindOwningPeer finds the peer which owns the rate limit with the provided name and unique key
func FindOwningPeer(name, key string) (gubernator.PeerInfo, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return gubernator.PeerInfo{}, err
}
return p.Info(), nil
}

// DaemonAt returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
Expand Down
160 changes: 160 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"
"testing"
"time"

guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
Expand All @@ -34,6 +35,10 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
json "google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -859,6 +864,88 @@ func TestGlobalRateLimits(t *testing.T) {
})
}

func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
const (
name = "test_global_token_limit"
key = "account:12345"
)

// Make a connection to a peer in the cluster which does not own this rate limit
client, err := getClientToNonOwningPeer(name, key)
require.NoError(t, err)

sendHit := func(expectedStatus guber.Status, hits int) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Hits: 1,
Limit: 2,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 1)
// Wait for the broadcast from the owner to the peer
time.Sleep(time.Second * 3)
// Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner
// to respond with OVER_LIMIT.
sendHit(guber.Status_OVER_LIMIT, 1)
// Wait for the broadcast from the owner to the peer
time.Sleep(time.Second * 3)
// The status should still be OVER_LIMIT
sendHit(guber.Status_OVER_LIMIT, 0)
}

func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) {
const (
name = "test_global_token_limit_leaky"
key = "account:12345"
)

// Make a connection to a peer in the cluster which does not own this rate limit
client, err := getClientToNonOwningPeer(name, key)
require.NoError(t, err)

sendHit := func(expectedStatus guber.Status, hits int) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Hits: 1,
Limit: 2,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}

sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 1)
time.Sleep(time.Second * 3)
sendHit(guber.Status_OVER_LIMIT, 1)
}

func getMetricRequest(t testutil.TestingT, url string, name string) *model.Sample {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -1273,3 +1360,76 @@ func getMetric(t testutil.TestingT, in io.Reader, name string) *model.Sample {
}
return nil
}

type staticBuilder struct{}

var _ resolver.Builder = (*staticBuilder)(nil)

func (sb *staticBuilder) Scheme() string {
return "static"
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})
}
if err := cc.UpdateState(resolver.State{Addresses: resolverAddrs}); err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

// newStaticBuilder returns a builder which returns a staticResolver that tells GRPC
// to connect a specific peer in the cluster.
func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

type staticResolver struct {
cc resolver.ClientConn
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}

var _ resolver.Resolver = (*staticResolver)(nil)

// findNonOwningPeer returns peer info for a peer in the cluster which does not
// own the rate limit for the name and key provided.
func findNonOwningPeer(name, key string) (guber.PeerInfo, error) {
owner, err := cluster.FindOwningPeer(name, key)
if err != nil {
return guber.PeerInfo{}, err
}

for _, p := range cluster.GetPeers() {
if p.HashKey() != owner.HashKey() {
return p, nil
}
}
return guber.PeerInfo{}, fmt.Errorf("unable to find non-owning peer in '%d' node cluster",
len(cluster.GetPeers()))
}

// getClientToNonOwningPeer returns a connection to a peer in the cluster which does not own
// the rate limit for the name and key provided.
func getClientToNonOwningPeer(name, key string) (guber.V1Client, error) {
p, err := findNonOwningPeer(name, key)
if err != nil {
return nil, err
}
conn, err := grpc.DialContext(context.Background(),
fmt.Sprintf("static:///%s", p.GRPCAddress),
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return guber.NewV1Client(conn), nil

}
44 changes: 15 additions & 29 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import (

"github.com/mailgun/holster/v4/syncutil"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/proto"
)

// globalManager manages async hit queue and updates peers in
// the cluster periodically when a global rate limit we own updates.
type globalManager struct {
asyncQueue chan *RateLimitReq
broadcastQueue chan *RateLimitReq
broadcastQueue chan *UpdatePeerGlobal
wg syncutil.WaitGroup
conf BehaviorConfig
log FieldLogger
Expand All @@ -43,7 +42,7 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager
gm := globalManager{
log: instance.log,
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *UpdatePeerGlobal, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Expand Down Expand Up @@ -74,8 +73,12 @@ func (gm *globalManager) QueueHit(r *RateLimitReq) {
gm.asyncQueue <- r
}

func (gm *globalManager) QueueUpdate(r *RateLimitReq) {
gm.broadcastQueue <- r
func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp) {
gm.broadcastQueue <- &UpdatePeerGlobal{
Key: req.HashKey(),
Algorithm: req.Algorithm,
Status: resp,
}
}

// runAsyncHits collects async hit requests and queues them to
Expand Down Expand Up @@ -173,18 +176,18 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
func (gm *globalManager) runBroadcasts() {
var interval = NewInterval(gm.conf.GlobalSyncWait)
updates := make(map[string]*RateLimitReq)
updates := make(map[string]*UpdatePeerGlobal)

gm.wg.Until(func(done chan struct{}) bool {
select {
case r := <-gm.broadcastQueue:
updates[r.HashKey()] = r
case updateReq := <-gm.broadcastQueue:
updates[updateReq.Key] = updateReq

// Send the hits if we reached our batch limit
if len(updates) >= gm.conf.GlobalBatchLimit {
gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc()
gm.broadcastPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
updates = make(map[string]*UpdatePeerGlobal)
return true
}

Expand All @@ -198,7 +201,7 @@ func (gm *globalManager) runBroadcasts() {
if len(updates) != 0 {
gm.metricBroadcastCounter.WithLabelValues("timer").Inc()
gm.broadcastPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
updates = make(map[string]*UpdatePeerGlobal)
} else {
gm.metricGlobalQueueLength.Set(0)
}
Expand All @@ -210,31 +213,14 @@ func (gm *globalManager) runBroadcasts() {
}

// broadcastPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) {
func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*UpdatePeerGlobal) {
defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration()
var req UpdatePeerGlobalsReq

gm.metricGlobalQueueLength.Set(float64(len(updates)))

for _, r := range updates {
// Copy the original since we are removing the GLOBAL behavior
rl := proto.Clone(r).(*RateLimitReq)
// We are only sending the status of the rate limit so, we
// clear the behavior flag, so we don't get queued for update again.
SetBehavior(&rl.Behavior, Behavior_GLOBAL, false)
rl.Hits = 0

status, err := gm.instance.getLocalRateLimit(ctx, rl)
if err != nil {
gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey())
continue
}
// Build an UpdatePeerGlobalsReq
req.Globals = append(req.Globals, &UpdatePeerGlobal{
Algorithm: rl.Algorithm,
Key: rl.HashKey(),
Status: status,
})
req.Globals = append(req.Globals, r)
}

fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/hashicorp/memberlist v0.5.0
github.com/mailgun/errors v0.1.5
github.com/mailgun/holster/v4 v4.16.2-0.20231121154636-69040cb71a3b
github.com/mailgun/holster/v4 v4.16.3
github.com/miekg/dns v1.1.50
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailgun/errors v0.1.5 h1:riRpZqfUKTdc8saXvoEg2tYkbRyZESU1KvQ3UxPbdus=
github.com/mailgun/errors v0.1.5/go.mod h1:lw+Nh4r/aoUTz6uK915FdfZJo3yq60gPiflFHNpK4NQ=
github.com/mailgun/holster/v4 v4.16.2-0.20231121154636-69040cb71a3b h1:ohMhrwmmA4JbXNukFpriztFWEVLlMuL90Cssg2Vl2TU=
github.com/mailgun/holster/v4 v4.16.2-0.20231121154636-69040cb71a3b/go.mod h1:phAg61z7LZ1PBfedyt2GXkGSlHhuVKK9AcVJO+Cm0/U=
github.com/mailgun/holster/v4 v4.16.3 h1:YMTkDoaFV83ViSaFuAfiyIvzrHJD1UNw7RjNv6J3Kfg=
github.com/mailgun/holster/v4 v4.16.3/go.mod h1:phAg61z7LZ1PBfedyt2GXkGSlHhuVKK9AcVJO+Cm0/U=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
Expand Down
Loading