Skip to content

Commit

Permalink
WIP: New V3.0 Release
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Apr 16, 2024
1 parent b5893f5 commit 66cc345
Show file tree
Hide file tree
Showing 50 changed files with 2,205 additions and 3,614 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5

- uses: actions/setup-go@v5
with:
Expand Down Expand Up @@ -40,7 +40,7 @@ jobs:
comment-on-alert: true

- name: Save benchmark JSON to cache
uses: actions/cache/save@v4
uses: actions/cache/save@v5
with:
path: ./cache/benchmark-data.json
# Save with commit hash to avoid "cache already exists"
Expand Down
52 changes: 15 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Gubernator is a distributed, high performance, cloud native and stateless rate-l
kubernetes or nomad trivial.
* Gubernator holds no state on disk, It’s configuration is passed to it by the
client on a per-request basis.
* Gubernator provides both GRPC and HTTP access to the API.
* It Can be run as a sidecar to services that need rate limiting or as a separate service.
* It Can be used as a library to implement a domain-specific rate limiting service.
* Supports optional eventually consistent rate limit distribution for extremely
Expand All @@ -38,8 +37,10 @@ $ docker-compose up -d
```
Now you can make rate limit requests via CURL
```
# Hit the HTTP API at localhost:9080 (GRPC is at 9081)
$ curl http://localhost:9080/v1/HealthCheck
# Hit the HTTP API at localhost:9080
$ curl http://localhost:9080/v1/health.check
# TODO: Update this example
# Make a rate limit request
$ curl http://localhost:9080/v1/GetRateLimits \
Expand All @@ -59,7 +60,7 @@ $ curl http://localhost:9080/v1/GetRateLimits \

### ProtoBuf Structure

An example rate limit request sent via GRPC might look like the following
An example rate limit request sent with protobuf might look like the following
```yaml
rate_limits:
# Scopes the request to a specific rate limit
Expand Down Expand Up @@ -214,7 +215,7 @@ limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
Gubernator server would. All you need to do is provide the GRPC server instance
Gubernator server would. All you need to do is provide the server instance
and tell Gubernator where the peers in your cluster are located. The
`cmd/gubernator/main.go` is a great example of how to use Gubernator as a
library.
Expand All @@ -238,21 +239,13 @@ to support rate limit durations longer than a minute, day or month, calls to
those rate limits that have durations over a self determined limit.

### API
All methods are accessed via GRPC but are also exposed via HTTP using the
[GRPC Gateway](https://github.com/grpc-ecosystem/grpc-gateway)

#### Health Check
Health check returns `unhealthy` in the event a peer is reported by etcd or kubernetes
as `up` but the server instance is unable to contact that peer via it's advertised address.

###### GRPC
```grpc
rpc HealthCheck (HealthCheckReq) returns (HealthCheckResp)
```

###### HTTP
```
GET /v1/HealthCheck
GET /v1/health.check
```
Example response:
Expand All @@ -269,14 +262,8 @@ Rate limits can be applied or retrieved using this interface. If the client
makes a request to the server with `hits: 0` then current state of the rate
limit is retrieved but not incremented.

###### GRPC
```grpc
rpc GetRateLimits (GetRateLimitsReq) returns (GetRateLimitsResp)
```

###### HTTP
```
POST /v1/GetRateLimits
POST /v1/rate-limit.check
```

Example Payload
Expand All @@ -285,7 +272,7 @@ Example Payload
"requests": [
{
"name": "requests_per_sec",
"uniqueKey": "account:12345",
"unique_key": "account:12345",
"hits": "1",
"limit": "10",
"duration": "1000"
Expand Down Expand Up @@ -314,20 +301,10 @@ Example response:
```

### Deployment
NOTE: Gubernator uses `etcd`, Kubernetes or round-robin DNS to discover peers and
NOTE: Gubernator uses `memberlist` Kubernetes or round-robin DNS to discover peers and
establish a cluster. If you don't have either, the docker-compose method is the
simplest way to try gubernator out.


##### Docker with existing etcd cluster
```bash
$ docker run -p 8081:81 -p 9080:80 -e GUBER_ETCD_ENDPOINTS=etcd1:2379,etcd2:2379 \
ghcr.io/gubernator-io/gubernator:latest

# Hit the HTTP API at localhost:9080
$ curl http://localhost:9080/v1/HealthCheck
```

##### Kubernetes
```bash
# Download the kubernetes deployment spec
Expand All @@ -346,14 +323,15 @@ you can use same fully-qualified domain name to both let your business logic con
instances to find `gubernator` and for `gubernator` containers/instances to find each other.

##### TLS
Gubernator supports TLS for both HTTP and GRPC connections. You can see an example with
self signed certs by running `docker-compose-tls.yaml`
Gubernator supports TLS. You can see an example with self-signed certs by running
`docker-compose-tls.yaml`
```bash
# Run docker compose
$ docker-compose -f docker-compose-tls.yaml up -d

# Hit the HTTP API at localhost:9080 (GRPC is at 9081)
$ curl --cacert certs/ca.cert --cert certs/gubernator.pem --key certs/gubernator.key https://localhost:9080/v1/HealthCheck
# Hit the HTTP API at localhost:9080
+$ curl -X POST --cacert certs/ca.cert --cert certs/gubernator.pem \
--key certs/gubernator.key https://localhost:9080/v1/health.check
```

### Configuration
Expand Down
25 changes: 9 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/mailgun/holster/v4/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -34,10 +33,7 @@ import (
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitRequest, reqState RateLimitRequestState) (resp *RateLimitResponse, err error) {
// Get rate limit from cache.
hashKey := r.HashKey()
item, ok := c.GetItem(hashKey)
Expand Down Expand Up @@ -81,7 +77,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
if s != nil {
s.Remove(ctx, hashKey)
}
return &RateLimitResp{
return &RateLimitResponse{
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: r.Limit,
Expand Down Expand Up @@ -112,7 +108,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
t.Limit = r.Limit
}

rl := &RateLimitResp{
rl := &RateLimitResponse{
Status: t.Status,
Limit: r.Limit,
Remaining: t.Remaining,
Expand Down Expand Up @@ -203,7 +199,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
}

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitRequest, reqState RateLimitRequestState) (resp *RateLimitResponse, err error) {
createdAt := *r.CreatedAt
expire := createdAt + r.Duration

Expand All @@ -229,7 +225,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
ExpireAt: expire,
}

rl := &RateLimitResp{
rl := &RateLimitResponse{
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: t.Remaining,
Expand Down Expand Up @@ -257,10 +253,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitRequest, reqState RateLimitRequestState) (resp *RateLimitResponse, err error) {
if r.Burst == 0 {
r.Burst = r.Limit
}
Expand Down Expand Up @@ -370,7 +363,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
b.Remaining = float64(b.Burst)
}

rl := &RateLimitResp{
rl := &RateLimitResponse{
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
Expand Down Expand Up @@ -434,7 +427,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitRequest, reqState RateLimitRequestState) (resp *RateLimitResponse, err error) {
createdAt := *r.CreatedAt
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
Expand All @@ -458,7 +451,7 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
Burst: r.Burst,
}

rl := RateLimitResp{
rl := RateLimitResponse{
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
Expand Down
2 changes: 1 addition & 1 deletion benchmark_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
"time"

"github.com/gubernator-io/gubernator/v2"
"github.com/gubernator-io/gubernator/v3"
"github.com/mailgun/holster/v4/clock"
)

Expand Down
37 changes: 31 additions & 6 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,40 @@ package gubernator_test

import (
"context"
"fmt"
"os"
"testing"

guber "github.com/gubernator-io/gubernator/v2"
"github.com/gubernator-io/gubernator/v2/cluster"
guber "github.com/gubernator-io/gubernator/v3"
"github.com/gubernator-io/gubernator/v3/cluster"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/stretchr/testify/require"
)

+// go test benchmark_test.go -bench=BenchmarkTrace -benchtime=20s -trace=trace.out
+// go tool trace trace.out
+func BenchmarkTrace(b *testing.B) {
if err := cluster.StartWith([]guber.PeerInfo{
{HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone},
{HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone},
{HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone},
{HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone},
{HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone},
{HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone},

// DataCenterOne
{HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne},
{HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne},
{HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne},
{HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne},
}); err != nil {
fmt.Println(err)
os.Exit(1)
}
defer cluster.Stop()
}

func BenchmarkServer(b *testing.B) {
ctx := context.Background()
conf := guber.Config{}
Expand All @@ -45,7 +70,7 @@ func BenchmarkServer(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
_, err := client.GetPeerRateLimit(ctx, &guber.RateLimitReq{
_, err := client.GetPeerRateLimit(ctx, &guber.RateLimitRequest{
Name: b.Name(),
UniqueKey: guber.RandomString(10),
// Behavior: guber.Behavior_NO_BATCHING,
Expand All @@ -67,7 +92,7 @@ func BenchmarkServer(b *testing.B) {

for n := 0; n < b.N; n++ {
_, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Requests: []*guber.RateLimitRequest{
{
Name: b.Name(),
UniqueKey: guber.RandomString(10),
Expand All @@ -90,7 +115,7 @@ func BenchmarkServer(b *testing.B) {

for n := 0; n < b.N; n++ {
_, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Requests: []*guber.RateLimitRequest{
{
Name: b.Name(),
UniqueKey: guber.RandomString(10),
Expand Down Expand Up @@ -128,7 +153,7 @@ func BenchmarkServer(b *testing.B) {
for n := 0; n < b.N; n++ {
fan.Run(func(o interface{}) error {
_, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Requests: []*guber.RateLimitRequest{
{
Name: b.Name(),
UniqueKey: guber.RandomString(10),
Expand Down
Loading

0 comments on commit 66cc345

Please sign in to comment.