Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Showing 18 changed files with 838 additions and 345 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.0] - 2019-12-10
### Added
* Added `Loader` interface for only loading and saving at startup and shutdown
* Added `Store` interface for continuous synchronization between the persistent store and the cache.
### Changed
* Moved `cache.Cache` into the `gubernator` package
* Changed the `Cache` interface to use `CacheItem` for storing and retrieving cached items.

## [0.6.0] - 2019-11-12
### Added
* DURATION_IS_GREGORIAN behavior to support interval based ratelimit durations
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -140,6 +140,40 @@ Examples when using `Behavior = DURATION_IS_GREGORIAN`
* If `Duration = 0` (Minutes) then the rate limit will reset to `Current = 0` at the end of the minute the rate limit was created.
* If `Duration = 4` (Months) then the rate limit will reset to `Current = 0` at the end of the month the rate limit was created.

## Gubernator as a library
If you are using golang, you can use Gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of Gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
Gubernator server would. All you need to do is provide the GRPC server instance
and tell Gubernator where the peers in your cluster are located. The
`cmd/gubernator/main.go` is a great example of how to use Gubernator as a
library.

### Optional Disk Persistence
While the Gubernator server currently doesn't directly support disk
persistence, the Gubernator library does provide interfaces through which
library users can implement persistence. The Gubernator library has two
interfaces available for disk persistence. Depending on the use case an
implementor can implement the [Loader](/store.go) interface and only support persistence
of rate limits at startup and shutdown, or users can implement the [Store](/store.go)
interface and Gubernator will continuously call `OnChange()` and `Get()` to
keep the in memory cache and persistent store up to date with the latest rate
limit data. Both interfaces *can* be implemented simultaneously to ensure data
is always saved to persistent storage.

For those who choose to implement the `Store` interface, it is not required to
store ALL the rate limits received via `OnChange()`. For instance; If you wish
to support rate limit durations longer than a minute, day or month, calls to
`OnChange()` can check the duration of a rate limit and decide to only persist
those rate limits that have durations over a self determined limit.

### API
All methods are accessed via GRPC but are also exposed via HTTP using the
[GRPC Gateway](https://github.com/grpc-ecosystem/grpc-gateway)
127 changes: 86 additions & 41 deletions algorithms.go
Original file line number Diff line number Diff line change
@@ -17,35 +17,52 @@ limitations under the License.
package gubernator

import (
"github.com/mailgun/gubernator/cache"
"time"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
item, ok := c.Get(r.HashKey())
func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
}

if ok {
// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.

rl, ok := item.(*RateLimitResp)
rl, ok := item.Value.(*RateLimitResp)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
return tokenBucket(c, r)
if s != nil {
s.Remove(r.HashKey())
}
return tokenBucket(s, c, r)
}

// If we are already at the limit
if rl.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
// Client is only interested in retrieving the current status
if r.Hits == 0 {
return rl, nil
}

// Client is only interested in retrieving the current status
if r.Hits == 0 {
if s != nil {
defer func() {
s.OnChange(r, item)
}()
}

// If we are already at the limit
if rl.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
return rl, nil
}

@@ -67,9 +84,8 @@ func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// Add a new rate limit to the cache
expire := cache.MillisecondNow() + r.Duration
expire := MillisecondNow() + r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
var err error
expire, err = GregorianExpiration(time.Now(), r.Duration)
if err != nil {
return nil, err
@@ -88,27 +104,42 @@ func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
status.Remaining = r.Limit
}

c.Add(r.HashKey(), status, expire)
item = &CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: status,
ExpireAt: expire,
}

c.Add(item)
if s != nil {
s.OnChange(r, item)
}
return status, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
type LeakyBucket struct {
Limit int64
Duration int64
LimitRemaining int64
TimeStamp int64
func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
}

now := cache.MillisecondNow()
item, ok := c.Get(r.HashKey())
if ok {
b, ok := item.(*LeakyBucket)
b, ok := item.Value.(*LeakyBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
return leakyBucket(c, r)
if s != nil {
s.Remove(r.HashKey())
}
return leakyBucket(s, c, r)
}

duration := r.Duration
@@ -134,19 +165,25 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
elapsed := now - b.TimeStamp
leak := int64(elapsed / rate)

b.LimitRemaining += leak
if b.LimitRemaining > b.Limit {
b.LimitRemaining = b.Limit
b.Remaining += leak
if b.Remaining > b.Limit {
b.Remaining = b.Limit
}

rl := &RateLimitResp{
Limit: b.Limit,
Remaining: b.LimitRemaining,
Remaining: b.Remaining,
Status: Status_UNDER_LIMIT,
}

if s != nil {
defer func() {
s.OnChange(r, item)
}()
}

// If we are already at the limit
if b.LimitRemaining == 0 {
if b.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
@@ -158,15 +195,15 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// If requested hits takes the remainder
if b.LimitRemaining == r.Hits {
b.LimitRemaining = 0
if b.Remaining == r.Hits {
b.Remaining = 0
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > b.LimitRemaining {
if r.Hits > b.Remaining {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
@@ -177,8 +214,8 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
return rl, nil
}

b.LimitRemaining -= r.Hits
rl.Remaining = b.LimitRemaining
b.Remaining -= r.Hits
rl.Remaining = b.Remaining
c.UpdateExpiration(r.HashKey(), now*duration)
return rl, nil
}
@@ -196,11 +233,11 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// Create a new leaky bucket
b := LeakyBucket{
LimitRemaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
b := LeakyBucketItem{
Remaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
}

rl := RateLimitResp{
@@ -214,10 +251,18 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
if r.Hits > r.Limit {
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
b.LimitRemaining = 0
b.Remaining = 0
}

c.Add(r.HashKey(), &b, now+duration)

item = &CacheItem{
ExpireAt: now + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
}
c.Add(item)
if s != nil {
s.OnChange(r, item)
}
return &rl, nil
}
12 changes: 0 additions & 12 deletions architecture.md
Original file line number Diff line number Diff line change
@@ -76,17 +76,5 @@ limit request if the cluster is large enough. GLOBAL should only be used for
extremely high volume rate limits that don't scale well with the traditional
non `GLOBAL` behavior.

## Gubernator as a library
If you are using golang, you can use gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
gubernator server would. All you need to do is provide the GRPC server instance
and tell gubernator where the peers in your cluster are located.

186 changes: 181 additions & 5 deletions cache.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,205 @@
/*
Copyright 2018-2019 Mailgun Technologies Inc
Modifications Copyright 2018 Mailgun Technologies Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This work is derived from github.com/golang/groupcache/lru
*/

package gubernator

import (
"github.com/mailgun/gubernator/cache"
"fmt"
"sync"
"time"

"container/list"
"github.com/mailgun/holster"
"github.com/prometheus/client_golang/prometheus"
)

// So algorithms can interface with different cache implementations
type Cache interface {
// Access methods
Add(*CacheItem) bool
UpdateExpiration(key interface{}, expireAt int64) bool
GetItem(key interface{}) (value *CacheItem, ok bool)
Each() chan *CacheItem
Remove(key interface{})

// If the cache is exclusive, this will control access to the cache
Unlock()
Lock()
}

// Holds stats collected about the cache
type cachStats struct {
Size int64
Miss int64
Hit int64
}

// Cache is an thread unsafe LRU cache that supports expiration
type LRUCache struct {
cache map[interface{}]*list.Element
mutex sync.Mutex
ll *list.List
stats cachStats
cacheSize int

// Stats
sizeMetric *prometheus.Desc
accessMetric *prometheus.Desc
}

type CacheItem struct {
Algorithm Algorithm
Key string
ExpireAt int64
Value interface{}
}

var _ Cache = &LRUCache{}

// New creates a new Cache with a maximum size
func NewCache(maxSize int) *cache.LRUCache {
func NewLRUCache(maxSize int) *LRUCache {
holster.SetDefault(&maxSize, 50000)

return cache.NewLRUCache(maxSize)
return &LRUCache{
cache: make(map[interface{}]*list.Element),
ll: list.New(),
cacheSize: maxSize,
sizeMetric: prometheus.NewDesc("cache_size",
"Size of the LRU Cache which holds the rate limits.", nil, nil),
accessMetric: prometheus.NewDesc("cache_access_count",
"Cache access counts.", []string{"type"}, nil),
}
}

func (c *LRUCache) Lock() {
c.mutex.Lock()
}

func (c *LRUCache) Unlock() {
c.mutex.Unlock()
}

func (c *LRUCache) Each() chan *CacheItem {
out := make(chan *CacheItem)
fmt.Printf("Each size: %d\n", len(c.cache))
go func() {
for _, ele := range c.cache {
out <- ele.Value.(*CacheItem)
}
close(out)
}()
return out
}

// Adds a value to the cache.
func (c *LRUCache) Add(record *CacheItem) bool {
// If the key already exist, set the new value
if ee, ok := c.cache[record.Key]; ok {
c.ll.MoveToFront(ee)
temp := ee.Value.(*CacheItem)
*temp = *record
return true
}

ele := c.ll.PushFront(record)
c.cache[record.Key] = ele
if c.cacheSize != 0 && c.ll.Len() > c.cacheSize {
c.removeOldest()
}
return false
}

// Return unix epoch in milliseconds
func MillisecondNow() int64 {
return time.Now().UnixNano() / 1000000
}

// GetItem returns the item stored in the cache
func (c *LRUCache) GetItem(key interface{}) (item *CacheItem, ok bool) {

if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*CacheItem)

// If the entry has expired, remove it from the cache
if entry.ExpireAt < MillisecondNow() {
c.removeElement(ele)
c.stats.Miss++
return
}
c.stats.Hit++
c.ll.MoveToFront(ele)
return entry, true
}
c.stats.Miss++
return
}

// Remove removes the provided key from the cache.
func (c *LRUCache) Remove(key interface{}) {
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}

// RemoveOldest removes the oldest item from the cache.
func (c *LRUCache) removeOldest() {
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}

func (c *LRUCache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*CacheItem)
delete(c.cache, kv.Key)
}

// Len returns the number of items in the cache.
func (c *LRUCache) Size() int {
return c.ll.Len()
}

func (c *LRUCache) Stats(_ bool) cachStats {
return c.stats
}

// Update the expiration time for the key
func (c *LRUCache) UpdateExpiration(key interface{}, expireAt int64) bool {
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*CacheItem)
entry.ExpireAt = expireAt
return true
}
return false
}

// Describe fetches prometheus metrics to be registered
func (c *LRUCache) Describe(ch chan<- *prometheus.Desc) {
ch <- c.sizeMetric
ch <- c.accessMetric
}

// Collect fetches metric counts and gauges from the cache
func (c *LRUCache) Collect(ch chan<- prometheus.Metric) {
c.mutex.Lock()
defer c.mutex.Unlock()
ch <- prometheus.MustNewConstMetric(c.accessMetric, prometheus.CounterValue, float64(c.stats.Hit), "hit")
ch <- prometheus.MustNewConstMetric(c.accessMetric, prometheus.CounterValue, float64(c.stats.Miss), "miss")
ch <- prometheus.MustNewConstMetric(c.sizeMetric, prometheus.GaugeValue, float64(len(c.cache)))
}
176 changes: 0 additions & 176 deletions cache/lru.go

This file was deleted.

45 changes: 0 additions & 45 deletions cache/types.go

This file was deleted.

64 changes: 40 additions & 24 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -45,8 +45,10 @@ func (i *instance) Peers() []gubernator.PeerInfo {
return result
}

func (i *instance) Stop() {
func (i *instance) Stop() error {
err := i.Guber.Close()
i.GRPC.GracefulStop()
return err
}

var instances []*instance
@@ -76,39 +78,22 @@ func Start(numInstances int) error {
// Start a local cluster with specific addresses
func StartWith(addresses []string) error {
for _, address := range addresses {
srv := grpc.NewServer()

guber, err := gubernator.New(gubernator.Config{
GRPCServer: srv,
ins, err := StartInstance(address, gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
},
})
if err != nil {
return errors.Wrap(err, "while creating new gubernator instance")
}

listener, err := net.Listen("tcp", address)
if err != nil {
return errors.Wrap(err, "while listening on random interface")
return errors.Wrapf(err, "while starting instance for addr '%s'", address)
}

go func() {
logrus.Infof("Listening on %s", listener.Addr().String())
if err := srv.Serve(listener); err != nil {
fmt.Printf("while serving: %s\n", err)
}
}()

peers = append(peers, listener.Addr().String())
instances = append(instances, &instance{
Address: listener.Addr().String(),
Guber: guber,
GRPC: srv,
})
// Add the peers and instances to the package level variables
peers = append(peers, ins.Address)
instances = append(instances, ins)
}

// Tell each instance about the other peers
for _, ins := range instances {
ins.Guber.SetPeers(ins.Peers())
}
@@ -120,3 +105,34 @@ func Stop() {
ins.Stop()
}
}

// Start a single instance of gubernator with the provided config and listening address.
// If address is empty string a random port on the loopback device will be chosen.
func StartInstance(address string, conf gubernator.Config) (*instance, error) {
conf.GRPCServer = grpc.NewServer()

guber, err := gubernator.New(conf)
if err != nil {
return nil, errors.Wrap(err, "while creating new gubernator instance")
}

listener, err := net.Listen("tcp", address)
if err != nil {
return nil, errors.Wrap(err, "while listening on random interface")
}

go func() {
logrus.Infof("Listening on %s", listener.Addr().String())
if err := conf.GRPCServer.Serve(listener); err != nil {
fmt.Printf("while serving: %s\n", err)
}
}()

guber.SetPeers([]gubernator.PeerInfo{{Address: listener.Addr().String(), IsOwner: true}})

return &instance{
Address: listener.Addr().String(),
GRPC: conf.GRPCServer,
Guber: guber,
}, nil
}
7 changes: 3 additions & 4 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
@@ -18,18 +18,17 @@ package main

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"net/http"
"os"
"os/signal"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cache"
"github.com/mailgun/holster"
"github.com/mailgun/holster/etcdutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
@@ -47,7 +46,7 @@ func main() {
checkErr(err, "while getting config")

// The LRU cache we store rate limits in
cache := cache.NewLRUCache(conf.CacheSize)
cache := gubernator.NewLRUCache(conf.CacheSize)

// cache also implements prometheus.Collector interface
prometheus.MustRegister(cache)
18 changes: 14 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
@@ -18,10 +18,10 @@ package gubernator

import (
"fmt"
"github.com/mailgun/gubernator/cache"
"time"

"github.com/mailgun/holster"
"google.golang.org/grpc"
"time"
)

// config for a gubernator instance
@@ -33,7 +33,17 @@ type Config struct {
Behaviors BehaviorConfig

// (Optional) The cache implementation
Cache cache.Cache
Cache Cache

// (Optional) A persistent store implementation. Allows the implementor the ability to store the rate limits this
// instance of gubernator owns. It's up to the implementor to decide what rate limits to persist.
// For instance an implementor might only persist rate limits that have an expiration of
// longer than 1 hour.
Store Store

// (Optional) A loader from a persistent store. Allows the implementor the ability to load and save
// the contents of the cache when the gubernator instance is started and stopped
Loader Loader

// (Optional) This is the peer picker algorithm the server will use decide which peer in the cluster
// will coordinate a rate limit
@@ -66,7 +76,7 @@ func (c *Config) SetDefaults() error {
holster.SetDefault(&c.Behaviors.GlobalSyncWait, time.Microsecond*500)

holster.SetDefault(&c.Picker, NewConsistantHash(nil))
holster.SetDefault(&c.Cache, cache.NewLRUCache(0))
holster.SetDefault(&c.Cache, NewLRUCache(0))

if c.Behaviors.BatchLimit > maxBatchSize {
return fmt.Errorf("Behaviors.BatchLimit cannot exceed '%d'", maxBatchSize)
5 changes: 3 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
@@ -207,8 +207,9 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) {
}
// Build an UpdatePeerGlobalsReq
req.Globals = append(req.Globals, &UpdatePeerGlobal{
Key: rl.HashKey(),
Status: status,
Algorithm: rl.Algorithm,
Key: rl.HashKey(),
Status: status,
})
}

57 changes: 47 additions & 10 deletions gubernator.go
Original file line number Diff line number Diff line change
@@ -40,11 +40,11 @@ const (
var log *logrus.Entry

type Instance struct {
wg holster.WaitGroup
health HealthCheckResp
global *globalManager
peerMutex sync.RWMutex
conf Config
isClosed bool
}

func New(conf Config) (*Instance, error) {
@@ -67,9 +67,42 @@ func New(conf Config) (*Instance, error) {
RegisterV1Server(conf.GRPCServer, &s)
RegisterPeersV1Server(conf.GRPCServer, &s)

if s.conf.Loader == nil {
return &s, nil
}

ch, err := s.conf.Loader.Load()
if err != nil {
return nil, errors.Wrap(err, "while loading persistent from store")
}

for item := range ch {
s.conf.Cache.Add(item)
}
return &s, nil
}

func (s *Instance) Close() error {
if s.isClosed {
return nil
}

if s.conf.Loader == nil {
return nil
}

out := make(chan *CacheItem, 500)
go func() {
for item := range s.conf.Cache.Each() {
fmt.Printf("Each: %+v\n", item)
out <- item
}
close(out)
}()
s.isClosed = true
return s.conf.Loader.Save(out)
}

// GetRateLimits is the public interface used by clients to request rate limits from the system. If the
// rate limit `Name` and `UniqueKey` is not owned by this instance then we forward the request to the
// peer that does.
@@ -190,14 +223,13 @@ func (s *Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error)
s.global.QueueHit(req)

s.conf.Cache.Lock()
item, ok := s.conf.Cache.Get(req.HashKey())
item, ok := s.conf.Cache.GetItem(req.HashKey())
s.conf.Cache.Unlock()
if ok {
rl, ok := item.(*RateLimitResp)
// Global rate limits are always stored as RateLimitResp regardless of algorithm
rl, ok := item.Value.(*RateLimitResp)
if !ok {
// Perhaps the rate limit algorithm was changed by the user.
s.conf.Cache.Remove(req.HashKey())
return s.getGlobalRateLimit(req)
return nil, errors.New("Global rate limit is of incorrect type")
}
return rl, nil
} else {
@@ -216,7 +248,12 @@ func (s *Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsRe
defer s.conf.Cache.Unlock()

for _, g := range r.Globals {
s.conf.Cache.Add(g.Key, g.Status, g.Status.ResetTime)
s.conf.Cache.Add(&CacheItem{
ExpireAt: g.Status.ResetTime,
Algorithm: g.Algorithm,
Value: g.Status,
Key: g.Key,
})
}
return &UpdatePeerGlobalsResp{}, nil
}
@@ -258,9 +295,9 @@ func (s *Instance) getRateLimit(r *RateLimitReq) (*RateLimitResp, error) {

switch r.Algorithm {
case Algorithm_TOKEN_BUCKET:
return tokenBucket(s.conf.Cache, r)
return tokenBucket(s.conf.Store, s.conf.Cache, r)
case Algorithm_LEAKY_BUCKET:
return leakyBucket(s.conf.Cache, r)
return leakyBucket(s.conf.Store, s.conf.Cache, r)
}
return nil, errors.Errorf("invalid rate limit algorithm '%d'", r.Algorithm)
}
@@ -313,7 +350,7 @@ func (s *Instance) SetPeers(peers []PeerInfo) {
ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout)
defer cancel()

shutdownPeers := []*PeerClient{}
var shutdownPeers []*PeerClient

for _, p := range oldPicker.Peers() {
peerInfo := s.conf.Picker.GetPeerByHost(p.host)
2 changes: 1 addition & 1 deletion gubernator.pb.go
16 changes: 16 additions & 0 deletions interval_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2018-2019 Mailgun Technologies Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator_test

import (
51 changes: 30 additions & 21 deletions peers.pb.go
1 change: 1 addition & 0 deletions proto/peers.proto
Original file line number Diff line number Diff line change
@@ -52,5 +52,6 @@ message UpdatePeerGlobalsReq {
message UpdatePeerGlobal {
string key = 1;
RateLimitResp status = 2;
Algorithm algorithm = 3;
}
message UpdatePeerGlobalsResp {}
121 changes: 121 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package gubernator

// PERSISTENT STORE DETAILS

// The storage interfaces defined here allows the implementor flexibility in storage options. Depending on the
// use case an implementor can only implement the `Loader` interface and only support persistence of
// ratelimits at startup and shutdown or implement `Store` and gubernator will continuously call `OnChange()`
// and `Get()` to keep the in memory cache and persistent store up to date with the latest ratelimit data.
// Both interfaces can be implemented simultaneously to ensure data is always saved to persistent storage.

type LeakyBucketItem struct {
Limit int64
Duration int64
Remaining int64
TimeStamp int64
}

// Store interface allows implementors to off load storage of all or a subset of ratelimits to
// some persistent store. Methods OnChange() and Get() should avoid blocking as much as possible as these
// methods are called on every rate limit request and will effect the performance of gubernator.
type Store interface {
// Called by gubernator when a rate limit item is updated. It's up to the store to
// decide if this rate limit item should be persisted in the store. It's up to the
// store to expire old rate limit items.
OnChange(r *RateLimitReq, item *CacheItem)

// Called by gubernator when a rate limit is missing from the cache. It's up to the store
// to decide if this request is fulfilled. Should return true if the request is fulfilled
// and false if the request is not fulfilled or doesn't exist in the store.
Get(r *RateLimitReq) (*CacheItem, bool)

// Called by gubernator when an existing rate limit should be removed from the store.
// NOTE: This is NOT called when an rate limit expires from the cache, store implementors
// must expire rate limits in the store.
Remove(key string)
}

// Loader interface allows implementors to store all or a subset of ratelimits into a persistent
// store during startup and shutdown of the gubernator instance.
type Loader interface {
// Load is called by gubernator just before the instance is ready to accept requests. The implementation
// should return a channel gubernator can read to load all rate limits that should be loaded into the
// instance cache. The implementation should close the channel to indicate no more rate limits left to load.
Load() (chan *CacheItem, error)

// Save is called by gubernator just before the instance is shutdown. The passed channel should be
// read until the channel is closed.
Save(chan *CacheItem) error
}

func NewMockStore() *MockStore {
ml := &MockStore{
Called: make(map[string]int),
CacheItems: make(map[string]*CacheItem),
}
ml.Called["OnChange()"] = 0
ml.Called["Remove()"] = 0
ml.Called["Get()"] = 0
return ml
}

type MockStore struct {
Called map[string]int
CacheItems map[string]*CacheItem
}

var _ Store = &MockStore{}

func (ms *MockStore) OnChange(r *RateLimitReq, item *CacheItem) {
ms.Called["OnChange()"] += 1
ms.CacheItems[item.Key] = item
}

func (ms *MockStore) Get(r *RateLimitReq) (*CacheItem, bool) {
ms.Called["Get()"] += 1
item, ok := ms.CacheItems[r.HashKey()]
return item, ok
}

func (ms *MockStore) Remove(key string) {
ms.Called["Remove()"] += 1
delete(ms.CacheItems, key)
}

func NewMockLoader() *MockLoader {
ml := &MockLoader{
Called: make(map[string]int),
}
ml.Called["Load()"] = 0
ml.Called["Save()"] = 0
return ml
}

type MockLoader struct {
Called map[string]int
CacheItems []*CacheItem
}

var _ Loader = &MockLoader{}

func (ml *MockLoader) Load() (chan *CacheItem, error) {
ml.Called["Load()"] += 1

ch := make(chan *CacheItem, 10)
go func() {
for _, i := range ml.CacheItems {
ch <- i
}
close(ch)
}()
return ch, nil
}

func (ml *MockLoader) Save(in chan *CacheItem) error {
ml.Called["Save()"] += 1

for i := range in {
ml.CacheItems = append(ml.CacheItems, i)
}
return nil
}
253 changes: 253 additions & 0 deletions store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Copyright 2018-2019 Mailgun Technologies Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator_test

import (
"context"
"testing"
"time"

"github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cluster"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLoader(t *testing.T) {
loader := gubernator.NewMockLoader()

ins, err := cluster.StartInstance("", gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
},
Loader: loader,
})
assert.Nil(t, err)

// loader.Load() should have been called for gubernator startup
assert.Equal(t, 1, loader.Called["Load()"])
assert.Equal(t, 0, loader.Called["Save()"])

client, err := gubernator.DialV1Server(ins.Address)
assert.Nil(t, err)

resp, err := client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{
{
Name: "test_over_limit",
UniqueKey: "account:1234",
Algorithm: gubernator.Algorithm_TOKEN_BUCKET,
Duration: gubernator.Second,
Limit: 2,
Hits: 1,
},
},
})
require.Nil(t, err)
require.NotNil(t, resp)
require.Equal(t, 1, len(resp.Responses))
require.Equal(t, "", resp.Responses[0].Error)

err = ins.Stop()
require.Nil(t, err)

// Loader.Save() should been called during gubernator shutdown
assert.Equal(t, 1, loader.Called["Load()"])
assert.Equal(t, 1, loader.Called["Save()"])

// Loader instance should have 1 rate limit
require.Equal(t, 1, len(loader.CacheItems))
item, ok := loader.CacheItems[0].Value.(*gubernator.RateLimitResp)
require.Equal(t, true, ok)
assert.Equal(t, int64(2), item.Limit)
assert.Equal(t, int64(1), item.Remaining)
assert.Equal(t, gubernator.Status_UNDER_LIMIT, item.Status)
}

func TestStore(t *testing.T) {
tests := []struct {
name string
firstRemaining int64
firstStatus gubernator.Status
secondRemaining int64
secondStatus gubernator.Status
algorithm gubernator.Algorithm
switchAlgorithm gubernator.Algorithm
testCase func(gubernator.RateLimitReq, *gubernator.MockStore)
}{
{
name: "Given there are no token bucket limits in the store",
firstRemaining: int64(9),
firstStatus: gubernator.Status_UNDER_LIMIT,
secondRemaining: int64(8),
secondStatus: gubernator.Status_UNDER_LIMIT,
algorithm: gubernator.Algorithm_TOKEN_BUCKET,
switchAlgorithm: gubernator.Algorithm_LEAKY_BUCKET,
testCase: func(req gubernator.RateLimitReq, store *gubernator.MockStore) {},
},
{
name: "Given the store contains a token bucket rate limit not in the guber cache",
firstRemaining: int64(0),
firstStatus: gubernator.Status_UNDER_LIMIT,
secondRemaining: int64(0),
secondStatus: gubernator.Status_OVER_LIMIT,
algorithm: gubernator.Algorithm_TOKEN_BUCKET,
switchAlgorithm: gubernator.Algorithm_LEAKY_BUCKET,
testCase: func(req gubernator.RateLimitReq, store *gubernator.MockStore) {
// Expire 1 second from now
expire := gubernator.MillisecondNow() + gubernator.Second
store.CacheItems[req.HashKey()] = &gubernator.CacheItem{
Algorithm: gubernator.Algorithm_TOKEN_BUCKET,
ExpireAt: expire,
Key: req.HashKey(),
Value: &gubernator.RateLimitResp{
ResetTime: expire,
Limit: req.Limit,
Remaining: 1,
},
}
},
},
{
name: "Given there are no leaky bucket limits in the store",
firstRemaining: int64(9),
firstStatus: gubernator.Status_UNDER_LIMIT,
secondRemaining: int64(8),
secondStatus: gubernator.Status_UNDER_LIMIT,
algorithm: gubernator.Algorithm_LEAKY_BUCKET,
switchAlgorithm: gubernator.Algorithm_TOKEN_BUCKET,
testCase: func(req gubernator.RateLimitReq, store *gubernator.MockStore) {},
},
{
name: "Given the store contains a leaky bucket rate limit not in the guber cache",
firstRemaining: int64(0),
firstStatus: gubernator.Status_UNDER_LIMIT,
secondRemaining: int64(0),
secondStatus: gubernator.Status_OVER_LIMIT,
algorithm: gubernator.Algorithm_LEAKY_BUCKET,
switchAlgorithm: gubernator.Algorithm_TOKEN_BUCKET,
testCase: func(req gubernator.RateLimitReq, store *gubernator.MockStore) {
// Expire 1 second from now
expire := gubernator.MillisecondNow() + gubernator.Second
store.CacheItems[req.HashKey()] = &gubernator.CacheItem{
Algorithm: gubernator.Algorithm_LEAKY_BUCKET,
ExpireAt: expire,
Key: req.HashKey(),
Value: &gubernator.LeakyBucketItem{
TimeStamp: gubernator.MillisecondNow(),
Duration: req.Duration,
Limit: req.Limit,
Remaining: 1,
},
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
store := gubernator.NewMockStore()

ins, err := cluster.StartInstance("", gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
},
Store: store,
})
assert.Nil(t, err)

// No calls to store
assert.Equal(t, 0, store.Called["OnChange()"])
assert.Equal(t, 0, store.Called["Get()"])

client, err := gubernator.DialV1Server(ins.Address)
assert.Nil(t, err)

req := gubernator.RateLimitReq{
Name: "test_over_limit",
UniqueKey: "account:1234",
Algorithm: tt.algorithm,
Duration: gubernator.Second,
Limit: 10,
Hits: 1,
}

tt.testCase(req, store)

// This request for the rate limit should ask the store via Get() and then
// tell the store about the change to the rate limit by calling OnChange()
resp, err := client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{&req},
})
require.Nil(t, err)
require.NotNil(t, resp)
require.Equal(t, 1, len(resp.Responses))
require.Equal(t, "", resp.Responses[0].Error)
assert.Equal(t, tt.firstRemaining, resp.Responses[0].Remaining)
assert.Equal(t, int64(10), resp.Responses[0].Limit)
assert.Equal(t, tt.firstStatus, resp.Responses[0].Status)

// Should have called OnChange() and Get()
assert.Equal(t, 1, store.Called["OnChange()"])
assert.Equal(t, 1, store.Called["Get()"])

// Should have updated the store
assert.Equal(t, tt.firstRemaining, getRemaining(store.CacheItems[req.HashKey()]))

// Next call should not call `Get()` but only `OnChange()`
resp, err = client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{&req},
})
require.Nil(t, err)
require.NotNil(t, resp)
assert.Equal(t, tt.secondRemaining, resp.Responses[0].Remaining)
assert.Equal(t, int64(10), resp.Responses[0].Limit)
assert.Equal(t, tt.secondStatus, resp.Responses[0].Status)

// Should have called OnChange() not Get() since rate limit is in the cache
assert.Equal(t, 2, store.Called["OnChange()"])
assert.Equal(t, 1, store.Called["Get()"])

// Should have updated the store
assert.Equal(t, tt.secondRemaining, getRemaining(store.CacheItems[req.HashKey()]))

// Should have called `Remove()` when algorithm changed
req.Algorithm = tt.switchAlgorithm
resp, err = client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{&req},
})
require.Nil(t, err)
require.NotNil(t, resp)
assert.Equal(t, 1, store.Called["Remove()"])
assert.Equal(t, 3, store.Called["OnChange()"])
assert.Equal(t, 2, store.Called["Get()"])

assert.Equal(t, tt.switchAlgorithm, store.CacheItems[req.HashKey()].Algorithm)
})
}
}

func getRemaining(item *gubernator.CacheItem) int64 {
switch item.Algorithm {
case gubernator.Algorithm_TOKEN_BUCKET:
return item.Value.(*gubernator.RateLimitResp).Remaining
case gubernator.Algorithm_LEAKY_BUCKET:
return item.Value.(*gubernator.LeakyBucketItem).Remaining
}
return 0
}

0 comments on commit d34c9d9

Please sign in to comment.