Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Now using holster clock instead of time for most time calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Oct 23, 2020
1 parent d7efdde commit 632440f
Show file tree
Hide file tree
Showing 19 changed files with 135 additions and 129 deletions.
12 changes: 6 additions & 6 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package gubernator

import (
"time"
"github.com/mailgun/holster/v3/clock"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
Expand Down Expand Up @@ -88,7 +88,7 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
if t.Duration != r.Duration {
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(time.Now(), r.Duration)
expire, err = GregorianExpiration(clock.Now(), r.Duration)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
now := MillisecondNow()
expire := now + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(time.Now(), r.Duration)
expire, err = GregorianExpiration(clock.Now(), r.Duration)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,11 +214,11 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
d, err := GregorianDuration(time.Now(), r.Duration)
d, err := GregorianDuration(clock.Now(), r.Duration)
if err != nil {
return nil, err
}
n := time.Now()
n := clock.Now()
expire, err := GregorianExpiration(n, r.Duration)
if err != nil {
return nil, err
Expand Down Expand Up @@ -291,7 +291,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er

duration := r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
n := time.Now()
n := clock.Now()
expire, err := GregorianExpiration(n, r.Duration)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package gubernator
import (
"container/list"
"sync"
"time"

"github.com/mailgun/holster/v3/clock"
"github.com/mailgun/holster/v3/setter"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (c *LRUCache) Add(record *CacheItem) bool {

// Return unix epoch in milliseconds
func MillisecondNow() int64 {
return time.Now().UnixNano() / 1000000
return clock.Now().UnixNano() / 1000000
}

// GetItem returns the item stored in the cache
Expand Down
16 changes: 8 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package gubernator

import (
"math/rand"
"time"

"github.com/mailgun/holster/v3/clock"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -48,18 +48,18 @@ func DialV1Server(server string) (V1Client, error) {
return NewV1Client(conn), nil
}

// Convert a time.Duration to a unix millisecond timestamp
func ToTimeStamp(duration time.Duration) int64 {
return int64(duration / time.Millisecond)
// Convert a clock.Duration to a unix millisecond timestamp
func ToTimeStamp(duration clock.Duration) int64 {
return int64(duration / clock.Millisecond)
}

// Convert a unix millisecond timestamp to a time.Duration
func FromTimeStamp(ts int64) time.Duration {
return time.Now().Sub(FromUnixMilliseconds(ts))
func FromTimeStamp(ts int64) clock.Duration {
return clock.Now().Sub(FromUnixMilliseconds(ts))
}

func FromUnixMilliseconds(ts int64) time.Time {
return time.Unix(0, ts*int64(time.Millisecond))
func FromUnixMilliseconds(ts int64) clock.Time {
return clock.Unix(0, ts*int64(clock.Millisecond))
}

// Given a list of peers, return a random peer
Expand Down
12 changes: 6 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package cluster
import (
"context"
"math/rand"
"time"

"github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/clock"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -76,17 +76,17 @@ func Restart(ctx context.Context) {
// Start a local cluster with specific addresses
func StartWith(localPeers []gubernator.PeerInfo) error {
for _, peer := range localPeers {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
Behaviors: gubernator.BehaviorConfig{
// Suitable for testing but not production
GlobalSyncWait: time.Millisecond * 50,
GlobalTimeout: time.Second * 5,
BatchTimeout: time.Second * 5,
MultiRegionTimeout: time.Second * 5,
GlobalSyncWait: clock.Millisecond * 50,
GlobalTimeout: clock.Second * 5,
BatchTimeout: clock.Second * 5,
MultiRegionTimeout: clock.Second * 5,
},
})
cancel()
Expand Down
6 changes: 3 additions & 3 deletions cmd/gubernator-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"fmt"
"math/rand"
"os"
"time"

"github.com/davecgh/go-spew/spew"
guber "github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/clock"
"github.com/mailgun/holster/v3/syncutil"
)

Expand Down Expand Up @@ -57,7 +57,7 @@ func main() {
UniqueKey: guber.RandomString(10),
Hits: 1,
Limit: randInt(1, 10),
Duration: randInt(int(time.Millisecond*500), int(time.Second*6)),
Duration: randInt(int(clock.Millisecond*500), int(clock.Second*6)),
Algorithm: guber.Algorithm_TOKEN_BUCKET,
})
}
Expand All @@ -67,7 +67,7 @@ func main() {
for _, rateLimit := range rateLimits {
fan.Run(func(obj interface{}) error {
r := obj.(*guber.RateLimitReq)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
ctx, cancel := context.WithTimeout(context.Background(), clock.Millisecond*500)
// Now hit our cluster with the rate limits
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{r},
Expand Down
10 changes: 5 additions & 5 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"runtime"
"strconv"
"strings"
"time"

etcd "github.com/coreos/etcd/clientv3"
"github.com/davecgh/go-spew/spew"
"github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/clock"
"github.com/mailgun/holster/v3/setter"
"github.com/mailgun/holster/v3/slice"
"github.com/pkg/errors"
Expand All @@ -59,7 +59,7 @@ func main() {
conf, err := confFromFile(configFile)
checkErr(err, "while getting config")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()

// Start the daemon
Expand Down Expand Up @@ -142,7 +142,7 @@ func confFromFile(configFile string) (gubernator.DaemonConfig, error) {
setter.SetDefault(&conf.EtcdPoolConf.KeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers")
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig, &etcd.Config{})
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Endpoints, getEnvSlice("GUBER_ETCD_ENDPOINTS"), []string{"localhost:2379"})
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration("GUBER_ETCD_DIAL_TIMEOUT"), time.Second*5)
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration("GUBER_ETCD_DIAL_TIMEOUT"), clock.Second*5)
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER"))
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD"))
setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
Expand Down Expand Up @@ -319,12 +319,12 @@ func getEnvInteger(name string) int {
return int(i)
}

func getEnvDuration(name string) time.Duration {
func getEnvDuration(name string) clock.Duration {
v := os.Getenv(name)
if v == "" {
return 0
}
d, err := time.ParseDuration(v)
d, err := clock.ParseDuration(v)
if err != nil {
log.WithError(err).Errorf("while parsing '%s' as a duration", name)
return 0
Expand Down
18 changes: 9 additions & 9 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package gubernator
import (
"context"
"encoding/json"
"time"

etcd "github.com/coreos/etcd/clientv3"
"github.com/mailgun/holster/v3/clock"
"github.com/mailgun/holster/v3/setter"
"github.com/mailgun/holster/v3/syncutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
etcdTimeout = time.Second * 10
backOffTimeout = time.Second * 5
etcdTimeout = clock.Second * 10
backOffTimeout = clock.Second * 5
leaseTTL = 30
defaultBaseKey = "/gubernator/peers/"
)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (e *EtcdPool) watchPeers() error {
select {
case <-ready:
e.log.Infof("watching for peer changes '%s' at revision %d", e.conf.KeyPrefix, revision)
case <-time.After(etcdTimeout):
case <-clock.After(etcdTimeout):
return errors.New("timed out while waiting for watcher.Watch() to start")
}
return nil
Expand Down Expand Up @@ -221,7 +221,7 @@ func (e *EtcdPool) watch() error {
e.log.WithError(err).
Error("while attempting to restart watch")
select {
case <-time.After(backOffTimeout):
case <-clock.After(backOffTimeout):
return true
case <-done:
return false
Expand Down Expand Up @@ -269,7 +269,7 @@ func (e *EtcdPool) register(name string) error {
return nil
}

var lastKeepAlive time.Time
var lastKeepAlive clock.Time

// Attempt to register our instance with etcd
if err = register(); err != nil {
Expand All @@ -283,7 +283,7 @@ func (e *EtcdPool) register(name string) error {
e.log.WithError(err).
Error("while attempting to re-register peer")
select {
case <-time.After(backOffTimeout):
case <-clock.After(backOffTimeout):
return true
case <-done:
return false
Expand All @@ -306,12 +306,12 @@ func (e *EtcdPool) register(name string) error {
}

// Ensure we are getting keep alive's regularly
if lastKeepAlive.Sub(time.Now()) > time.Second*leaseTTL {
if lastKeepAlive.Sub(clock.Now()) > clock.Second*leaseTTL {
e.log.Warn("to long between keep alive heartbeats, re-registering peer")
keepAlive = nil
return true
}
lastKeepAlive = time.Now()
lastKeepAlive = clock.Now()
case <-done:
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
if _, err := e.conf.Client.Delete(ctx, instanceKey); err != nil {
Expand Down
Loading

0 comments on commit 632440f

Please sign in to comment.