Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Change global behavior (#219)
Browse files Browse the repository at this point in the history
* test: Add test for global rate limiting with load balancing

* fix global update behavior

* Added findNonOwningPeer() and getClientToNonOwningPeer()

* Fix global mode

* remove logs and add comment

Co-authored-by: Yamil Asusta <[email protected]>

* fixed global over-consume issue and improved global testing

---------

Co-authored-by: Philip Gough <[email protected]>
Co-authored-by: Yamil Asusta <[email protected]>
Co-authored-by: Maria Ines Parnisari <[email protected]>
  • Loading branch information
4 people authored Feb 21, 2024
1 parent 6f1e32a commit a312ed7
Show file tree
Hide file tree
Showing 10 changed files with 673 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
skip-cache: true

- name: Test
run: go test -v -race -p=1 -count=1
run: go test -v -race -p=1 -count=1 -tags holster_test_mode
go-bench:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint: $(GOLANGCI_LINT)

.PHONY: test
test:
(go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \
(go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \
go tool cover -func coverage.out; \
go tool cover -html coverage.out -o coverage.html; \
exit $$ret)
Expand Down
23 changes: 13 additions & 10 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ### NOTE ###
// The both token and leaky follow the same semantic which allows for requests of more than the limit
// to be rejected, but subsequent requests within the same window that are under the limit to succeed.
// IE: client attempts to send 1000 emails but 100 is their limit. The request is rejected as over the
// limit, but we do not set the remainder to 0 in the cache. The client can retry within the same window
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

Expand Down Expand Up @@ -82,12 +89,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: 0,
}, nil
}

// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
Expand Down Expand Up @@ -388,22 +389,24 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
b.Remaining = 0
rl.Remaining = 0
}

return rl, nil
}

Expand Down
51 changes: 48 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// FindOwningPeer finds the peer which owns the rate limit with the provided name and unique key
func FindOwningPeer(name, key string) (gubernator.PeerInfo, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return gubernator.PeerInfo{}, err
}
return p.Info(), nil
}

// FindOwningDaemon finds the daemon which owns the rate limit with the provided name and unique key
func FindOwningDaemon(name, key string) (*gubernator.Daemon, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return &gubernator.Daemon{}, err
}

for i, d := range daemons {
if d.PeerInfo.GRPCAddress == p.Info().GRPCAddress {
return daemons[i], nil
}
}
return &gubernator.Daemon{}, errors.New("unable to find owning daemon")
}

// ListNonOwningDaemons returns a list of daemons in the cluster that do not own the rate limit
// for the name and key provided.
func ListNonOwningDaemons(name, key string) ([]*gubernator.Daemon, error) {
owner, err := FindOwningDaemon(name, key)
if err != nil {
return []*gubernator.Daemon{}, err
}

var daemons []*gubernator.Daemon
for _, d := range GetDaemons() {
if d.PeerInfo.GRPCAddress != owner.PeerInfo.GRPCAddress {
daemons = append(daemons, d)
}
}
return daemons, nil
}

// DaemonAt returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
Expand Down Expand Up @@ -112,6 +153,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
InstanceID: peer.GRPCAddress,
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Expand All @@ -127,12 +169,15 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress)
}

// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
p := gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
}
d.PeerInfo = p

// Add the peers and daemons to the package level variables
peers = append(peers, p)
daemons = append(daemons, d)
}

Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type BehaviorConfig struct {

// Config for a gubernator instance
type Config struct {
InstanceID string

// (Required) A list of GRPC servers to register our instance with
GRPCServers []*grpc.Server

Expand Down
77 changes: 73 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gubernator
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
Expand All @@ -40,13 +41,16 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/encoding/protojson"
)

type Daemon struct {
GRPCListeners []net.Listener
HTTPListener net.Listener
V1Server *V1Instance
InstanceID string
PeerInfo PeerInfo

log FieldLogger
pool PoolInterface
Expand All @@ -59,6 +63,7 @@ type Daemon struct {
promRegister *prometheus.Registry
gwCancel context.CancelFunc
instanceConf Config
client V1Client
}

// SpawnDaemon starts a new gubernator daemon according to the provided DaemonConfig.
Expand All @@ -67,8 +72,9 @@ type Daemon struct {
func SpawnDaemon(ctx context.Context, conf DaemonConfig) (*Daemon, error) {

s := &Daemon{
log: conf.Logger,
conf: conf,
InstanceID: conf.InstanceID,
log: conf.Logger,
conf: conf,
}
return s, s.Start(ctx)
}
Expand All @@ -77,8 +83,8 @@ func (s *Daemon) Start(ctx context.Context) error {
var err error

setter.SetDefault(&s.log, logrus.WithFields(logrus.Fields{
"instance-id": s.conf.InstanceID,
"category": "gubernator",
"instance": s.conf.InstanceID,
"category": "gubernator",
}))

s.promRegister = prometheus.NewRegistry()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (s *Daemon) Start(ctx context.Context) error {
Behaviors: s.conf.Behaviors,
CacheSize: s.conf.CacheSize,
Workers: s.conf.Workers,
InstanceID: s.conf.InstanceID,
}

s.V1Server, err = NewV1Instance(s.instanceConf)
Expand Down Expand Up @@ -411,6 +418,30 @@ func (s *Daemon) Peers() []PeerInfo {
return peers
}

func (s *Daemon) MustClient() V1Client {
c, err := s.Client()
if err != nil {
panic(fmt.Sprintf("[%s] failed to init daemon client - '%s'", s.InstanceID, err))
}
return c
}

func (s *Daemon) Client() (V1Client, error) {
if s.client != nil {
return s.client, nil
}

conn, err := grpc.DialContext(context.Background(),
fmt.Sprintf("static:///%s", s.PeerInfo.GRPCAddress),
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
s.client = NewV1Client(conn)
return s.client, nil
}

// WaitForConnect returns nil if the list of addresses is listening
// for connections; will block until context is cancelled.
func WaitForConnect(ctx context.Context, addresses []string) error {
Expand Down Expand Up @@ -451,3 +482,41 @@ func WaitForConnect(ctx context.Context, addresses []string) error {
}
return nil
}

type staticBuilder struct{}

var _ resolver.Builder = (*staticBuilder)(nil)

func (sb *staticBuilder) Scheme() string {
return "static"
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})
}
if err := cc.UpdateState(resolver.State{Addresses: resolverAddrs}); err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

// newStaticBuilder returns a builder which returns a staticResolver that tells GRPC
// to connect a specific peer in the cluster.
func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

type staticResolver struct {
cc resolver.ClientConn
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}

var _ resolver.Resolver = (*staticResolver)(nil)
Loading

0 comments on commit a312ed7

Please sign in to comment.