Skip to content

Commit

Permalink
Pull request #52: Fix deadlock on SquirrelDB startup with Cassandra c…
Browse files Browse the repository at this point in the history
…luster

Merge in PRODUCT/squirreldb from bugfix/PRODUCT-2255-deadlock-on-cassandra-connection to master

* commit '8851fea1e794212cd5ade8047296034dbbb3613e':
  Use a separate lock for ConnectionObserver
  • Loading branch information
PierreF committed Feb 27, 2023
2 parents 0cdfe42 + 8851fea commit 68ab6b7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
18 changes: 10 additions & 8 deletions cassandra/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type Connection struct {
cancel context.CancelFunc
wg sync.WaitGroup
wakeRunLoop chan interface{}
observer *connectObserver
lastConnectionEstablished time.Time
lastObservedError connectError
}

// New creates a new Cassandra session and return if the keyspace was create by this instance.
Expand Down Expand Up @@ -94,20 +94,23 @@ func New(ctx context.Context, options config.Cassandra, logger zerolog.Logger) (

runCtx, cancel := context.WithCancel(context.Background())

wakeRunLoop := make(chan interface{})

manager := &Connection{
logger: logger,
cluster: cluster,
sessionUserCount: make(map[int]int),
sessions: make(map[int]*gocql.Session),
cancel: cancel,
wakeRunLoop: make(chan interface{}),
wakeRunLoop: wakeRunLoop,
observer: &connectObserver{wakeRunLoop: wakeRunLoop},
}

manager.wg.Add(1)

go manager.run(runCtx) //nolint: contextcheck

cluster.ConnectObserver = connectObserver{connection: manager}
cluster.ConnectObserver = manager.observer

return manager, keyspaceCreated, nil
}
Expand Down Expand Up @@ -238,22 +241,21 @@ func (c *Connection) runOnce(ctx context.Context) bool {
c.l.Lock()

reopenConnection := false
lastObservedError := c.observer.GetAndClearLastObservation()

if err != nil {
c.logger.Info().Err(err).Msg("Cassandra connection is no longer valid, reopening one")

reopenConnection = true
} else if c.lastObservedError.err != nil && time.Since(c.lastConnectionEstablished) > 15*time.Minute {
} else if lastObservedError.err != nil && time.Since(c.lastConnectionEstablished) > 15*time.Minute {
c.logger.Info().
Err(c.lastObservedError.err).
Str("HostnameAndPort", c.lastObservedError.hostAndPort).
Err(lastObservedError.err).
Str("HostnameAndPort", lastObservedError.hostAndPort).
Msg("Observed connection and last reconnection is more than 15 minutes old")

reopenConnection = true
}

c.lastObservedError = connectError{}

return reopenConnection
}

Expand Down
34 changes: 26 additions & 8 deletions cassandra/connection/observer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,49 @@
package connection

import "github.com/gocql/gocql"
import (
"sync"

"github.com/gocql/gocql"
"github.com/rs/zerolog"
)

type connectObserver struct {
connection *Connection
l sync.Mutex
lastObservedError connectError
logger zerolog.Logger
wakeRunLoop chan interface{}
}

type connectError struct {
err error
hostAndPort string
}

func (obs connectObserver) ObserveConnect(msg gocql.ObservedConnect) {
func (obs *connectObserver) GetAndClearLastObservation() connectError {
obs.l.Lock()
defer obs.l.Unlock()

value := obs.lastObservedError
obs.lastObservedError = connectError{}

return value
}

func (obs *connectObserver) ObserveConnect(msg gocql.ObservedConnect) {
if msg.Err != nil {
obs.connection.l.Lock()
obs.connection.lastObservedError = connectError{
obs.l.Lock()
obs.lastObservedError = connectError{
err: msg.Err,
hostAndPort: msg.Host.HostnameAndPort(),
}
obs.connection.l.Unlock()
obs.l.Unlock()

select {
case obs.connection.wakeRunLoop <- nil:
case obs.wakeRunLoop <- nil:
default:
}

obs.connection.logger.Debug().
obs.logger.Debug().
Err(msg.Err).
Str("HostnameAndPort", msg.Host.HostnameAndPort()).
Msg("ObserveConnect see an error")
Expand Down

0 comments on commit 68ab6b7

Please sign in to comment.