Skip to content

Commit

Permalink
Avoid calling Close() or Shutdown() more than once
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Nov 18, 2024
1 parent 26ce571 commit 668b6c8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
10 changes: 4 additions & 6 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/mailgun/errors"
Expand Down Expand Up @@ -49,7 +50,7 @@ type V1Instance struct {
peerMutex sync.RWMutex
log FieldLogger
conf Config
isClosed bool
isClosed atomic.Bool
workerPool *WorkerPool
}

Expand Down Expand Up @@ -149,16 +150,14 @@ func NewV1Instance(conf Config) (s *V1Instance, err error) {
}

func (s *V1Instance) Close() (err error) {
ctx := context.Background()

if s.isClosed {
if !s.isClosed.CompareAndSwap(false, true) {
return nil
}

s.global.Close()

if s.conf.Loader != nil {
err = s.workerPool.Store(ctx)
err = s.workerPool.Store(context.Background())
if err != nil {
s.log.WithError(err).
Error("Error in workerPool.Store")
Expand All @@ -173,7 +172,6 @@ func (s *V1Instance) Close() (err error) {
return errors.Wrap(err, "Error in workerPool.Close")
}

s.isClosed = true
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type PeerClient struct {
queue chan *request
queueClosed atomic.Bool
lastErrs *collections.LRUCache
isShutdown atomic.Bool

wgMutex sync.RWMutex
wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex)
Expand Down Expand Up @@ -406,6 +407,10 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
// Shutdown waits until all outstanding requests have finished or the context is cancelled.
// Then it closes the grpc connection.
func (c *PeerClient) Shutdown(ctx context.Context) error {
if !c.isShutdown.CompareAndSwap(false, true) {
return nil
}

// ensure we don't leak goroutines, even if the Shutdown times out
defer c.conn.Close()

Expand Down
4 changes: 4 additions & 0 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type WorkerPool struct {
hashRingStep uint64
conf *Config
done chan struct{}
isClosed atomic.Bool
}

type Worker struct {
Expand Down Expand Up @@ -155,6 +156,9 @@ func (ph *hasher) ComputeHash63(input string) uint64 {
}

func (p *WorkerPool) Close() error {
if !p.isClosed.CompareAndSwap(false, true) {
return nil
}
close(p.done)
return nil
}
Expand Down

0 comments on commit 668b6c8

Please sign in to comment.