Skip to content

Commit

Permalink
Give up manager role if DCS is reachable by unavailable primary
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Jul 9, 2024
1 parent 5f3b965 commit fa72558
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 3 deletions.
5 changes: 2 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type App struct {
ctx context.Context
mode appMode
nodeFailTime map[string]time.Time
splitTime map[string]time.Time
state appState
critical atomic.Value
logger *slog.Logger
Expand Down Expand Up @@ -72,9 +73,6 @@ func NewApp(configFile, logLevel string) (*App, error) {
return nil, err
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevelN}))
if err != nil {
return nil, err
}
mode, err := parseMode(conf.Mode)
if err != nil {
return nil, err
Expand All @@ -83,6 +81,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
ctx: baseContext(),
mode: mode,
nodeFailTime: make(map[string]time.Time),
splitTime: make(map[string]time.Time),
state: stateInit,
logger: logger,
config: conf,
Expand Down
39 changes: 39 additions & 0 deletions internal/app/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -136,9 +137,47 @@ func (app *App) stateManager() appState {
}
if !shardState[master].PingOk {
app.logger.Error(fmt.Sprintf("Master %s probably failed, do not perform any kind of repair", master))
if master != app.config.Hostname && shardStateDcs[master].PingOk {
if app.splitTime[master].IsZero() {
app.splitTime[master] = time.Now()
}
if app.config.Redis.FailoverTimeout > 0 {
failedTime := time.Since(app.splitTime[master])
if failedTime < app.config.Redis.FailoverTimeout {
app.logger.Error(fmt.Sprintf("According to DCS %s is still alive, will wait for %v before giving up on manager role",
master, app.config.Redis.FailoverTimeout-failedTime))
return stateManager
}
}
app.logger.Error(fmt.Sprintf("According to DCS master %s is alive, but we see it as failed. Giving up on manager role", master))
delete(app.splitTime, master)
app.dcs.ReleaseLock(pathManagerLock)
waitCtx, cancel := context.WithTimeout(app.ctx, app.config.Redis.FailoverTimeout)
defer cancel()
ticker := time.NewTicker(app.config.TickInterval)
var manager dcs.LockOwner
Out:
for {
select {
case <-ticker.C:
err = app.dcs.Get(pathManagerLock, &manager)
if err != nil {
app.logger.Error(fmt.Sprintf("Failed to get %s", pathManagerLock), "error", err)
} else if manager.Hostname != app.config.Hostname {
app.logger.Info(fmt.Sprintf("New manager: %s", manager.Hostname))
break Out
}
case <-waitCtx.Done():
app.logger.Error("No node took manager lock for failover timeout")
break Out
}
}
return stateCandidate
}
return stateManager
}
delete(app.nodeFailTime, master)
delete(app.splitTime, master)
app.repairShard(shardState, activeNodes, master)

if updateActive {
Expand Down
51 changes: 51 additions & 0 deletions tests/features/06_cluster_lost.feature
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,54 @@ Feature: Cluster mode survives dcs conn loss
And host "redis2" is attached to the network
And host "redis3" is attached to the network
Then redis host "redis1" should become available within "60" seconds

Scenario: Cluster mode partially partitioned manager gives up on manager role
Given clustered shard is up and running
Then redis host "redis1" should be master
And redis host "redis2" should become replica of "redis1" within "15" seconds
And replication on redis host "redis2" should run fine within "15" seconds
And redis host "redis3" should become replica of "redis1" within "15" seconds
And replication on redis host "redis3" should run fine within "15" seconds
And zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl stop rdsync
"""
Then command return code should be "0"
And zookeeper node "/test/manager" should match regexp within "30" seconds
"""
.*redis[23].*
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl start rdsync
"""
When I get zookeeper node "/test/manager"
And I save zookeeper query result as "new_manager"
And port "6379" on host "{{.new_manager.hostname}}" is blocked
And I wait for "60" seconds
Then redis host "redis1" should be master
When I run command on host "{{.new_manager.hostname}}"
"""
grep ERROR /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*Giving up on manager role.*
"""
When I run command on host "{{.new_manager.hostname}}"
"""
grep INFO /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*New manager.*
"""
When port "6379" on host "{{.new_manager.hostname}}" is unblocked
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
51 changes: 51 additions & 0 deletions tests/features/06_sentinel_lost.feature
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,54 @@ Feature: Sentinel mode survives dcs conn loss
And host "redis2" is attached to the network
And host "redis3" is attached to the network
Then redis host "redis1" should become available within "60" seconds

Scenario: Sentinel mode partially partitioned manager gives up on manager role
Given sentinel shard is up and running
Then redis host "redis1" should be master
And redis host "redis2" should become replica of "redis1" within "15" seconds
And replication on redis host "redis2" should run fine within "15" seconds
And redis host "redis3" should become replica of "redis1" within "15" seconds
And replication on redis host "redis3" should run fine within "15" seconds
And zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl stop rdsync
"""
Then command return code should be "0"
And zookeeper node "/test/manager" should match regexp within "30" seconds
"""
.*redis[23].*
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl start rdsync
"""
When I get zookeeper node "/test/manager"
And I save zookeeper query result as "new_manager"
And port "6379" on host "{{.new_manager.hostname}}" is blocked
And I wait for "60" seconds
Then redis host "redis1" should be master
When I run command on host "{{.new_manager.hostname}}"
"""
grep ERROR /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*Giving up on manager role.*
"""
When I run command on host "{{.new_manager.hostname}}"
"""
grep INFO /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*New manager.*
"""
When port "6379" on host "{{.new_manager.hostname}}" is unblocked
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
10 changes: 10 additions & 0 deletions tests/rdsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func (tctx *testContext) stepHostIsAttachedToTheNetwork(host string) error {
return tctx.composer.AttachToNet(host)
}

func (tctx *testContext) stepPortOnHostIsBlocked(port int, host string) error {
return tctx.composer.BlockPort(host, port)
}

func (tctx *testContext) stepPortOnHostIsUnBlocked(port int, host string) error {
return tctx.composer.UnBlockPort(host, port)
}

func (tctx *testContext) stepHostIsAdded(host string) error {
err := tctx.composer.Start(host)
if err != nil {
Expand Down Expand Up @@ -1010,6 +1018,8 @@ func InitializeScenario(s *godog.ScenarioContext) {
s.Step(`^host "([^"]*)" is detached from the network$`, tctx.stepHostIsDetachedFromTheNetwork)
s.Step(`^host "([^"]*)" is started$`, tctx.stepHostIsStarted)
s.Step(`^host "([^"]*)" is attached to the network$`, tctx.stepHostIsAttachedToTheNetwork)
s.Step(`^port "(\d+)" on host "([^"]*)" is blocked$`, tctx.stepPortOnHostIsBlocked)
s.Step(`^port "(\d+)" on host "([^"]*)" is unblocked$`, tctx.stepPortOnHostIsUnBlocked)
s.Step(`^host "([^"]*)" is added`, tctx.stepHostIsAdded)
s.Step(`^host "([^"]*)" is deleted$`, tctx.stepHostIsDeleted)

Expand Down
46 changes: 46 additions & 0 deletions tests/testutil/docker_composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type Composer interface {
DetachFromNet(service string) error
// Attachs container/VM to network
AttachToNet(service string) error
// Blocks port on host
BlockPort(service string, port int) error
// Unblocks port on host
UnBlockPort(service string, port int) error
// Executes command inside container/VM with given timeout.
// Returns command retcode and output (stdoud and stderr are mixed)
RunCommand(service, cmd string, timeout time.Duration) (retcode int, output string, err error)
Expand Down Expand Up @@ -334,6 +338,48 @@ func (dc *DockerComposer) DetachFromNet(service string) error {
return nil
}

// BlockPort blocks port for host
func (dc *DockerComposer) BlockPort(service string, port int) error {
_, ok := dc.containers[service]
if !ok {
return fmt.Errorf("no such service: %s", service)
}
cmds := []string{
fmt.Sprintf("iptables -A INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("iptables -A OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -A INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -A OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
}
for _, cmd := range cmds {
_, _, err := dc.RunCommand(service, cmd, defaultDockerTimeout)
if err != nil {
return err
}
}
return nil
}

// UnBlockPort removes blocking rules for port on host
func (dc *DockerComposer) UnBlockPort(service string, port int) error {
_, ok := dc.containers[service]
if !ok {
return fmt.Errorf("no such service: %s", service)
}
cmds := []string{
fmt.Sprintf("iptables -D INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("iptables -D OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -D INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -D OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
}
for _, cmd := range cmds {
_, _, err := dc.RunCommand(service, cmd, defaultDockerTimeout)
if err != nil {
return err
}
}
return nil
}

func newUntarReaderCloser(reader io.ReadCloser) (io.ReadCloser, error) {
tarReader := tar.NewReader(reader)
_, err := tarReader.Next()
Expand Down

0 comments on commit fa72558

Please sign in to comment.