Skip to content

Commit

Permalink
Moved redis circuit breaker to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Kalpit Pant committed Oct 29, 2024
1 parent 0488004 commit 36b0f9e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
9 changes: 9 additions & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ go 1.21
require github.com/stretchr/testify v1.8.4

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
)

require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.7.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
37 changes: 19 additions & 18 deletions redis_circuit_breaker.go → v2/redis_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type CacheClient interface {
}

// RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage
type RedisCircuitBreaker struct {
*CircuitBreaker
type RedisCircuitBreaker[T any] struct {
*CircuitBreaker[T]
redisClient CacheClient
}

Expand All @@ -27,9 +27,9 @@ type RedisSettings struct {
}

// NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings
func NewRedisCircuitBreaker(redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker {
cb := NewCircuitBreaker(settings.Settings)
return &RedisCircuitBreaker{
func NewRedisCircuitBreaker[T any](redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker[T] {
cb := NewCircuitBreaker[T](settings.Settings)
return &RedisCircuitBreaker[T]{
CircuitBreaker: cb,
redisClient: redisClient,
}
Expand All @@ -43,7 +43,7 @@ type RedisState struct {
Expiry time.Time `json:"expiry"`
}

func (rcb *RedisCircuitBreaker) State(ctx context.Context) State {
func (rcb *RedisCircuitBreaker[T]) State(ctx context.Context) State {
if rcb.redisClient == nil {
return rcb.CircuitBreaker.State()
}
Expand All @@ -70,13 +70,14 @@ func (rcb *RedisCircuitBreaker) State(ctx context.Context) State {
}

// Execute runs the given request if the RedisCircuitBreaker accepts it
func (rcb *RedisCircuitBreaker) Execute(ctx context.Context, req func() (interface{}, error)) (interface{}, error) {
func (rcb *RedisCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) {
if rcb.redisClient == nil {
return rcb.CircuitBreaker.Execute(req)
}
generation, err := rcb.beforeRequest(ctx)
if err != nil {
return nil, err
var zero T
return zero, err
}

defer func() {
Expand All @@ -93,7 +94,7 @@ func (rcb *RedisCircuitBreaker) Execute(ctx context.Context, req func() (interfa
return result, err
}

func (rcb *RedisCircuitBreaker) beforeRequest(ctx context.Context) (uint64, error) {
func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) {
state, err := rcb.getRedisState(ctx)
if err != nil {
return 0, err
Expand Down Expand Up @@ -124,7 +125,7 @@ func (rcb *RedisCircuitBreaker) beforeRequest(ctx context.Context) (uint64, erro
return generation, nil
}

func (rcb *RedisCircuitBreaker) afterRequest(ctx context.Context, before uint64, success bool) {
func (rcb *RedisCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) {
state, err := rcb.getRedisState(ctx)
if err != nil {
return
Expand All @@ -144,7 +145,7 @@ func (rcb *RedisCircuitBreaker) afterRequest(ctx context.Context, before uint64,
rcb.setRedisState(ctx, state)
}

func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State, now time.Time) {
func (rcb *RedisCircuitBreaker[T]) onSuccess(state *RedisState, currentState State, now time.Time) {
if state.State == StateOpen {
state.State = currentState
}
Expand All @@ -160,7 +161,7 @@ func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State,
}
}

func (rcb *RedisCircuitBreaker) onFailure(state *RedisState, currentState State, now time.Time) {
func (rcb *RedisCircuitBreaker[T]) onFailure(state *RedisState, currentState State, now time.Time) {
switch currentState {
case StateClosed:
state.Counts.onFailure()
Expand All @@ -172,7 +173,7 @@ func (rcb *RedisCircuitBreaker) onFailure(state *RedisState, currentState State,
}
}

func (rcb *RedisCircuitBreaker) currentState(state RedisState, now time.Time) (State, uint64) {
func (rcb *RedisCircuitBreaker[T]) currentState(state RedisState, now time.Time) (State, uint64) {
switch state.State {
case StateClosed:
if !state.Expiry.IsZero() && state.Expiry.Before(now) {
Expand All @@ -186,7 +187,7 @@ func (rcb *RedisCircuitBreaker) currentState(state RedisState, now time.Time) (S
return state.State, state.Generation
}

func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now time.Time) {
func (rcb *RedisCircuitBreaker[T]) setState(state *RedisState, newState State, now time.Time) {
if state.State == newState {
return
}
Expand All @@ -201,7 +202,7 @@ func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now
}
}

func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time) {
func (rcb *RedisCircuitBreaker[T]) toNewGeneration(state *RedisState, now time.Time) {

state.Generation++
state.Counts.clear()
Expand All @@ -221,11 +222,11 @@ func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time
}
}

func (rcb *RedisCircuitBreaker) getRedisKey() string {
func (rcb *RedisCircuitBreaker[T]) getRedisKey() string {
return "cb:" + rcb.name
}

func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState, error) {
func (rcb *RedisCircuitBreaker[T]) getRedisState(ctx context.Context) (RedisState, error) {
var state RedisState
data, err := rcb.redisClient.Get(ctx, rcb.getRedisKey()).Bytes()
if err == redis.Nil {
Expand All @@ -239,7 +240,7 @@ func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState,
return state, err
}

func (rcb *RedisCircuitBreaker) setRedisState(ctx context.Context, state RedisState) error {
func (rcb *RedisCircuitBreaker[T]) setRedisState(ctx context.Context, state RedisState) error {
data, err := json.Marshal(state)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/stretchr/testify/assert"
)

var defaultRCB *RedisCircuitBreaker
var customRCB *RedisCircuitBreaker
var defaultRCB *RedisCircuitBreaker[any]
var customRCB *RedisCircuitBreaker[any]

func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redis.Client) {
func setupTestWithMiniredis() (*RedisCircuitBreaker[any], *miniredis.Miniredis, *redis.Client) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
Expand All @@ -24,7 +24,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi
Addr: mr.Addr(),
})

return NewRedisCircuitBreaker(client, RedisSettings{
return NewRedisCircuitBreaker[any](client, RedisSettings{
Settings: Settings{
Name: "TestBreaker",
MaxRequests: 3,
Expand All @@ -37,7 +37,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi
}), mr, client
}

func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker, period time.Duration) {
func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker[any], period time.Duration) {
state, _ := rcb.getRedisState(ctx)

state.Expiry = state.Expiry.Add(-period)
Expand All @@ -48,12 +48,12 @@ func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker, period time
rcb.setRedisState(ctx, state)
}

func successRequest(ctx context.Context, rcb *RedisCircuitBreaker) error {
func successRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error {
_, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, nil })
return err
}

func failRequest(ctx context.Context, rcb *RedisCircuitBreaker) error {
func failRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error {
_, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") })
if err != nil && err.Error() == "fail" {
return nil
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) {
Addr: mr.Addr(),
})

customRCB = NewRedisCircuitBreaker(client, RedisSettings{
customRCB = NewRedisCircuitBreaker[any](client, RedisSettings{
Settings: Settings{
Name: "CustomBreaker",
MaxRequests: 3,
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) {
Addr: mr.Addr(),
})

cb := NewRedisCircuitBreaker(client, RedisSettings{Settings: customSt})
cb := NewRedisCircuitBreaker[any](client, RedisSettings{Settings: customSt})

ctx := context.Background()

Expand Down

0 comments on commit 36b0f9e

Please sign in to comment.