Skip to content

Commit

Permalink
Introduce distributed mutex (#84)
Browse files Browse the repository at this point in the history
* Introduce distributed mutex

* Update distributed_gobreaker.go

* Update distributed_gobreaker_test.go

* Update distributed_gobreaker_test.go

* Update distributed_gobreaker_test.go

* Update go.mod

* Update go.mod

* Fix errors

* gofmt
  • Loading branch information
YoshiyukiMineo authored Dec 29, 2024
1 parent aaba3f4 commit c8765ba
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 32 deletions.
98 changes: 83 additions & 15 deletions v2/distributed_gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type SharedState struct {

// SharedDataStore stores the shared state of DistributedCircuitBreaker.
type SharedDataStore interface {
Lock(name string) error
Unlock(name string) error
GetData(name string) ([]byte, error)
SetData(name string, data []byte) error
}
Expand All @@ -34,17 +36,28 @@ type DistributedCircuitBreaker[T any] struct {
}

// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker.
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (*DistributedCircuitBreaker[T], error) {
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (dcb *DistributedCircuitBreaker[T], err error) {
if store == nil {
return nil, ErrNoSharedStore
}

dcb := &DistributedCircuitBreaker[T]{
dcb = &DistributedCircuitBreaker[T]{
CircuitBreaker: NewCircuitBreaker[T](settings),
store: store,
}

_, err := dcb.getSharedState()
err = dcb.lock()
if err != nil {
return nil, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

_, err = dcb.getSharedState()
if err == ErrNoSharedState {
err = dcb.setSharedState(dcb.extract())
}
Expand All @@ -55,8 +68,43 @@ func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Setting
return dcb, nil
}

const (
mutexTimeout = 5 * time.Second
mutexWaitTime = 500 * time.Millisecond
)

func (dcb *DistributedCircuitBreaker[T]) mutexKey() string {
return "gobreaker:mutex:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) lock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

var err error
expiry := time.Now().Add(mutexTimeout)
for time.Now().Before(expiry) {
err = dcb.store.Lock(dcb.mutexKey())
if err == nil {
return nil
}

time.Sleep(mutexWaitTime)
}
return err
}

func (dcb *DistributedCircuitBreaker[T]) unlock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

return dcb.store.Unlock(dcb.mutexKey())
}

func (dcb *DistributedCircuitBreaker[T]) sharedStateKey() string {
return "gobreaker:" + dcb.name
return "gobreaker:state:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) getSharedState() (SharedState, error) {
Expand Down Expand Up @@ -112,37 +160,57 @@ func (dcb *DistributedCircuitBreaker[T]) extract() SharedState {
}

// State returns the State of DistributedCircuitBreaker.
func (dcb *DistributedCircuitBreaker[T]) State() (State, error) {
func (dcb *DistributedCircuitBreaker[T]) State() (state State, err error) {
shared, err := dcb.getSharedState()
if err != nil {
return shared.State, err
}

err = dcb.lock()
if err != nil {
return state, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
state := dcb.CircuitBreaker.State()
state = dcb.CircuitBreaker.State()
shared = dcb.extract()

err = dcb.setSharedState(shared)
return state, err
}

// Execute runs the given request if the DistributedCircuitBreaker accepts it.
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (T, error) {
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (t T, err error) {
shared, err := dcb.getSharedState()
if err != nil {
var defaultValue T
return defaultValue, err
return t, err
}

err = dcb.lock()
if err != nil {
return t, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
t, e := dcb.CircuitBreaker.Execute(req)
t, err = dcb.CircuitBreaker.Execute(req)
shared = dcb.extract()

err = dcb.setSharedState(shared)
if err != nil {
var defaultValue T
return defaultValue, err
e := dcb.setSharedState(shared)
if e != nil {
return t, e
}

return t, e
return t, err
}
59 changes: 43 additions & 16 deletions v2/distributed_gobreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,57 @@ import (
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

type storeAdapter struct {
ctx context.Context
client *redis.Client
rs *redsync.Redsync
mutex map[string]*redsync.Mutex
}

func (sa *storeAdapter) GetData(key string) ([]byte, error) {
return sa.client.Get(sa.ctx, key).Bytes()
func newStoreAdapter(client *redis.Client) *storeAdapter {
return &storeAdapter{
ctx: context.Background(),
client: client,
rs: redsync.New(goredis.NewPool(client)),
mutex: map[string]*redsync.Mutex{},
}
}

func (sa *storeAdapter) Lock(name string) error {
mutex, ok := sa.mutex[name]
if ok {
return mutex.Lock()
}

mutex = sa.rs.NewMutex(name, redsync.WithExpiry(mutexTimeout))
sa.mutex[name] = mutex
return mutex.Lock()
}

func (sa *storeAdapter) Unlock(name string) error {
mutex, ok := sa.mutex[name]
if ok {
var err error
ok, err = mutex.Unlock()
if ok && err == nil {
return nil
}
}
return errors.New("unlock failed")
}

func (sa *storeAdapter) SetData(key string, value []byte) error {
return sa.client.Set(sa.ctx, key, value, 0).Err()
func (sa *storeAdapter) GetData(name string) ([]byte, error) {
return sa.client.Get(sa.ctx, name).Bytes()
}

func (sa *storeAdapter) SetData(name string, data []byte) error {
return sa.client.Set(sa.ctx, name, data, 0).Err()
}

var redisServer *miniredis.Miniredis
Expand All @@ -37,10 +73,7 @@ func setUpDCB() *DistributedCircuitBreaker[any] {
Addr: redisServer.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

dcb, err := NewDistributedCircuitBreaker[any](store, Settings{
Name: "TestBreaker",
Expand Down Expand Up @@ -205,10 +238,7 @@ func TestCustomDistributedCircuitBreaker(t *testing.T) {
Addr: mr.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

customDCB, err = NewDistributedCircuitBreaker[any](store, Settings{
Name: "CustomBreaker",
Expand Down Expand Up @@ -301,10 +331,7 @@ func TestCustomDistributedCircuitBreakerStateTransitions(t *testing.T) {
Addr: mr.Addr(),
})

store := &storeAdapter{
ctx: context.Background(),
client: client,
}
store := newStoreAdapter(client)

dcb, err := NewDistributedCircuitBreaker[any](store, customSt)
assert.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
module github.com/sony/gobreaker/v2

go 1.21
go 1.22

toolchain go1.22.10

require github.com/stretchr/testify v1.8.4

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
)

require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-redsync/redsync/v4 v4.13.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.7.0
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
25 changes: 25 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,45 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA=
github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down

0 comments on commit c8765ba

Please sign in to comment.