Skip to content

Commit

Permalink
changing node Lock timeout to 60sec and LUKS command timeout to 30sec…
Browse files Browse the repository at this point in the history
… (#1576)
  • Loading branch information
bhatnag authored Apr 17, 2024
1 parent 1fdbe7b commit e8a71e6
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 17 deletions.
53 changes: 37 additions & 16 deletions frontend/csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultNodeReconciliationPeriod = 1 * time.Minute
maximumNodeReconciliationJitter = 5000 * time.Millisecond
nvmeMaxFlushWaitDuration = 6 * time.Minute
csiNodeLockTimeout = 60 * time.Second
)

var (
Expand All @@ -51,20 +52,33 @@ var (
NVMeNamespacesFlushRetry = make(map[string]time.Time)
)

func attemptLock(ctx context.Context, lockContext string, lockTimeout time.Duration) bool {
startTime := time.Now()
utils.Lock(ctx, lockContext, lockID)
// Fail if the gRPC call came in a long time ago to avoid kubelet 120s timeout
if time.Since(startTime) > lockTimeout {
Logc(ctx).Debugf("Request spent more than %v in the queue and timed out", csiNodeLockTimeout)
return false
}
return true
}

func (p *Plugin) NodeStageVolume(
ctx context.Context, req *csi.NodeStageVolumeRequest,
) (*csi.NodeStageVolumeResponse, error) {
ctx = SetContextWorkflow(ctx, WorkflowNodeStage)
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)

lockContext := "NodeStageVolume-" + req.GetVolumeId()
utils.Lock(ctx, lockContext, lockID)
defer utils.Unlock(ctx, lockContext, lockID)

fields := LogFields{"Method": "NodeStageVolume", "Type": "CSI_Node"}
Logc(ctx).WithFields(fields).Debug(">>>> NodeStageVolume")
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeStageVolume")

lockContext := "NodeStageVolume-" + req.GetVolumeId()
defer utils.Unlock(ctx, lockContext, lockID)

if !attemptLock(ctx, lockContext, csiNodeLockTimeout) {
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
}
switch req.PublishContext["protocol"] {
case string(tridentconfig.File):
if req.PublishContext["filesystemType"] == utils.SMB {
Expand Down Expand Up @@ -94,10 +108,6 @@ func (p *Plugin) NodeUnstageVolume(
func (p *Plugin) nodeUnstageVolume(
ctx context.Context, req *csi.NodeUnstageVolumeRequest, force bool,
) (*csi.NodeUnstageVolumeResponse, error) {
lockContext := "NodeUnstageVolume-" + req.GetVolumeId()
utils.Lock(ctx, lockContext, lockID)
defer utils.Unlock(ctx, lockContext, lockID)

fields := LogFields{
"Method": "NodeUnstageVolume",
"Type": "CSI_Node",
Expand All @@ -106,6 +116,12 @@ func (p *Plugin) nodeUnstageVolume(
Logc(ctx).WithFields(fields).Debug(">>>> NodeUnstageVolume")
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeUnstageVolume")

lockContext := "NodeUnstageVolume-" + req.GetVolumeId()
defer utils.Unlock(ctx, lockContext, lockID)

if !attemptLock(ctx, lockContext, csiNodeLockTimeout) {
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
}
// Empty strings for either of these arguments are required by CSI Sanity to return an error.
if req.VolumeId == "" {
msg := "nodeUnstageVolume was called, but no VolumeID was provided in the request"
Expand Down Expand Up @@ -179,14 +195,16 @@ func (p *Plugin) NodePublishVolume(
ctx = SetContextWorkflow(ctx, WorkflowNodePublish)
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)

lockContext := "NodePublishVolume-" + req.GetVolumeId()
utils.Lock(ctx, lockContext, lockID)
defer utils.Unlock(ctx, lockContext, lockID)

fields := LogFields{"Method": "NodePublishVolume", "Type": "CSI_Node"}
Logc(ctx).WithFields(fields).Debug(">>>> NodePublishVolume")
defer Logc(ctx).WithFields(fields).Debug("<<<< NodePublishVolume")

lockContext := "NodePublishVolume-" + req.GetVolumeId()
defer utils.Unlock(ctx, lockContext, lockID)

if !attemptLock(ctx, lockContext, csiNodeLockTimeout) {
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
}
switch req.PublishContext["protocol"] {
case string(tridentconfig.File):
trackingInfo, err := p.nodeHelper.ReadTrackingInfo(ctx, req.VolumeId)
Expand Down Expand Up @@ -220,14 +238,17 @@ func (p *Plugin) NodeUnpublishVolume(
ctx = SetContextWorkflow(ctx, WorkflowNodeUnpublish)
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)

lockContext := "NodeUnpublishVolume-" + req.GetVolumeId()
utils.Lock(ctx, lockContext, lockID)
defer utils.Unlock(ctx, lockContext, lockID)

fields := LogFields{"Method": "NodeUnpublishVolume", "Type": "CSI_Node"}
Logc(ctx).WithFields(fields).Debug(">>>> NodeUnpublishVolume")
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeUnpublishVolume")

lockContext := "NodeUnpublishVolume-" + req.GetVolumeId()
defer utils.Unlock(ctx, lockContext, lockID)

if !attemptLock(ctx, lockContext, csiNodeLockTimeout) {
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
}

if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "no volume ID provided")
}
Expand Down
70 changes: 70 additions & 0 deletions frontend/csi/node_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package csi
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -994,3 +995,72 @@ func TestFixNVMeSessions(t *testing.T) {
// Cleanup of global objects.
publishedNVMeSessions.RemoveNVMeSession(subsystem1.NQN)
}

// The test is to check if the lock is acquired by the first request for a long time
// the second request timesout and returns false while attempting to aquire lock
// This is done by letting the first request acquire the lock and starting another go routine
// that also tries to take a lock with a timeout of 2sec. The first requests relinquishes the lock
// after 5sec. By the time the second request gets the lock, locktimeout has expired and it returns
// a failure
func TestAttemptLock_Failure(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

ctx := context.Background()
lockContext := "fakeLockContext-req1"
lockTimeout := 200 * time.Millisecond
// first request takes the lock
expected := attemptLock(ctx, lockContext, lockTimeout)

// start the second request so that it is in race for the lock
go func() {
defer wg.Done()
ctx := context.Background()
lockContext := "fakeLockContext-req2"
expected := attemptLock(ctx, lockContext, lockTimeout)

assert.False(t, expected)
utils.Unlock(ctx, lockContext, lockID)
}()
// first request goes to sleep holding the lock
if expected {
time.Sleep(500 * time.Millisecond)
}
utils.Unlock(ctx, lockContext, lockID)
wg.Wait()
}

// The test is to check if the lock is acquired by the first request for a short time
// the second request doesn't timesout and aquires lock after request1 releases the lock
// This is done by letting the first request acquire the lock and starting another go routine
// that also tries to take a lock with a timeout of 5sec. The first requests relinquishes the lock
// after 2sec. The second request gets the lock before the locktimeout has expired and returns success.
func TestAttemptLock_Success(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

ctx := context.Background()
lockContext := "fakeLockContext-req1"
lockTimeout := 500 * time.Millisecond
// first request takes the lock
expected := attemptLock(ctx, lockContext, lockTimeout)

// start the second request so that it is in race for the lock
go func() {
defer wg.Done()
ctx := context.Background()
lockContext := "fakeLockContext-req2"
lockTimeout := 5 * time.Second

expected := attemptLock(ctx, lockContext, lockTimeout)

assert.True(t, expected)
utils.Unlock(ctx, lockContext, lockID)
}()
// first request goes to sleep holding the lock
if expected {
time.Sleep(200 * time.Millisecond)
}
utils.Unlock(ctx, lockContext, lockID)
wg.Wait()
}
2 changes: 1 addition & 1 deletion utils/devices_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
luksCommandTimeout time.Duration = time.Second * 300
luksCommandTimeout time.Duration = time.Second * 30
luksCypherMode = "aes-xts-plain64"
luksType = "luks2"
// Return code for "no permission (bad passphrase)" from cryptsetup command
Expand Down

0 comments on commit e8a71e6

Please sign in to comment.