Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: improve the mechanism of disk removing #102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions pkg/controller/blockdevice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,41 @@ func Register(
return nil
}

func (c *Controller) handleDeviceRemove(device *diskv1.BlockDevice) error {
logrus.Infof("Prepare to stop provisioning device %s to node %s", device.Name, c.NodeName)
if err := c.unprovisionDeviceFromNode(device); err != nil {
err := fmt.Errorf("failed to stop provisioning device %s to node %s: %w", device.Name, c.NodeName, err)
logrus.Warnf("Removing disk %v error: %v", device.Name, err)
return err
}
return nil
}

// OnBlockDeviceChange watch the block device CR on change and performing disk operations
// like mounting the disks to a desired path via ext4
func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) {
if device == nil || device.DeletionTimestamp != nil || device.Spec.NodeName != c.NodeName || device.Status.State == diskv1.BlockDeviceInactive {
if device == nil || device.DeletionTimestamp != nil || device.Spec.NodeName != c.NodeName {
return nil, nil
}

// handle remove device no matter inactive or corrupted, we will set `device.Spec.FileSystem.Provisioned` to false
if !device.Spec.FileSystem.Provisioned && device.Status.ProvisionPhase != diskv1.ProvisionPhaseUnprovisioned {
deviceCpy := device.DeepCopy()
if err := c.handleDeviceRemove(deviceCpy); err != nil {
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
}
if !reflect.DeepEqual(device, deviceCpy) {
logrus.Debugf("Update block device %s after removing", device.Name)
return c.Blockdevices.Update(deviceCpy)
}
}

// corrupted device could be skipped if we do not set ForceFormatted or Repaired
if device.Status.State == diskv1.BlockDeviceInactive {
return nil, nil
}
if device.Status.DeviceStatus.FileSystem.Corrupted && !device.Spec.FileSystem.ForceFormatted && !device.Spec.FileSystem.Repaired {
return nil, nil
}
Expand Down Expand Up @@ -294,15 +321,6 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
}
case !needProvision && device.Status.ProvisionPhase != diskv1.ProvisionPhaseUnprovisioned:
logrus.Infof("Prepare to stop provisioning device %s to node %s", device.Name, c.NodeName)
if err := c.unprovisionDeviceFromNode(deviceCpy); err != nil {
err := fmt.Errorf("failed to stop provisioning device %s to node %s: %w", device.Name, c.NodeName, err)
logrus.Error(err)
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
}
}

if !reflect.DeepEqual(device, deviceCpy) {
Expand Down Expand Up @@ -535,6 +553,19 @@ func (c *Controller) unprovisionDeviceFromNode(device *diskv1.BlockDevice) error
diskv1.DiskAddedToNode.Message(device, msg)
}

removeDiskFromNode := func() error {
nodeCpy := node.DeepCopy()
delete(nodeCpy.Spec.Disks, device.Name)
if _, err := c.Nodes.Update(nodeCpy); err != nil {
return err
}
return nil
}

isValidateToDelete := func(lhDisk longhornv1.DiskSpec) bool {
return !lhDisk.AllowScheduling
}

diskToRemove, ok := node.Spec.Disks[device.Name]
if !ok {
logrus.Infof("disk %s not in disks of longhorn node %s/%s", device.Name, c.Namespace, c.NodeName)
Expand All @@ -543,19 +574,41 @@ func (c *Controller) unprovisionDeviceFromNode(device *diskv1.BlockDevice) error
}

isUnprovisioning := false
for _, tag := range diskToRemove.Tags {
for _, tag := range device.Status.Tags {
if tag == utils.DiskRemoveTag {
isUnprovisioning = true
break
}
}

// for inactive/corrupted disk, we could remove it from node directly
if isUnprovisioning && isValidateToDelete(diskToRemove) &&
(device.Status.State == diskv1.BlockDeviceInactive || device.Status.DeviceStatus.FileSystem.Corrupted) {
logrus.Infof("disk (%s) is inactive or corrupted, remove it from node directly", device.Name)
// handle mountpoint first
filesystem := c.BlockInfo.GetFileSystemInfoByDevPath(device.Status.DeviceStatus.DevPath)
if filesystem != nil && filesystem.MountPoint != "" {
timeout := 30 * time.Second
if err := utils.ForceUmountWithTimeout(filesystem.MountPoint, timeout); err != nil {
logrus.Warnf("Force umount %v error: %v", filesystem.MountPoint, err)
}
// reset related fields
c.updateDeviceFileSystem(device, device.Status.DeviceStatus.DevPath)
device.Spec.Tags = []string{}
device.Status.Tags = []string{}
}
// remove the disk from node
if err := removeDiskFromNode(); err != nil {
return err
}
updateProvisionPhaseUnprovisioned()
return nil
}

if isUnprovisioning {
if status, ok := node.Status.DiskStatus[device.Name]; ok && len(status.ScheduledReplica) == 0 {
// Unprovision finished. Remove the disk.
nodeCpy := node.DeepCopy()
delete(nodeCpy.Spec.Disks, device.Name)
if _, err := c.Nodes.Update(nodeCpy); err != nil {
if err := removeDiskFromNode(); err != nil {
return err
}
updateProvisionPhaseUnprovisioned()
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,24 @@ func execute(command string, args []string, timeout time.Duration) (string, erro
cmd := exec.Command(command, args...)

var output, stderr bytes.Buffer
cmdTimeout := false
cmd.Stdout = &output
cmd.Stderr = &stderr

timer := time.NewTimer(cmdTimeoutNone)
if timeout != cmdTimeoutNone {
// add timer to kill the process if timeout
timer = time.AfterFunc(timeout, func() {
cmdTimeout = true
cmd.Process.Kill()
})
}
defer timer.Stop()

if err := cmd.Run(); err != nil {
if cmdTimeout {
return "", errors.Wrapf(CmdTimeoutError, "timeout after %v: %v %v", timeout, command, args)
}
return "", errors.Wrapf(err, "failed to execute: %v %v, output %s, stderr %s",
command, args, output.String(), stderr.String())
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/longhorn/longhorn-manager/util"
)
Expand All @@ -20,6 +21,8 @@ const (
DiskRemoveTag = "harvester-ndm-disk-remove"
)

var CmdTimeoutError error

var ext4MountOptions = strings.Join([]string{
"journal_checksum",
"journal_ioprio=0",
Expand All @@ -36,13 +39,15 @@ func IsHostProcMounted() (bool, error) {
return err == nil, nil
}

// GetFullDevPath will return full path with `/dev/` prefix
func GetFullDevPath(shortPath string) string {
if shortPath == "" {
return ""
}
return fmt.Sprintf("/dev/%s", shortPath)
}

// MatchesIgnoredCase checks if the item of string slice fully match the key with case-insensitive
func MatchesIgnoredCase(s []string, k string) bool {
for _, e := range s {
if strings.EqualFold(e, k) {
Expand All @@ -52,6 +57,7 @@ func MatchesIgnoredCase(s []string, k string) bool {
return false
}

// ContainsIgnoredCase checks if the item of string slice contains the key with case-insensitive
func ContainsIgnoredCase(s []string, k string) bool {
k = strings.ToLower(k)
for _, e := range s {
Expand All @@ -62,6 +68,8 @@ func ContainsIgnoredCase(s []string, k string) bool {
return false
}

// MakeExt4DiskFormatting formats the specified volume device to ext4 with the specified UUID
// return error if failed
func MakeExt4DiskFormatting(devPath, uuid string) error {
args := []string{"-F", devPath}
if uuid != "" {
Expand Down Expand Up @@ -121,6 +129,22 @@ func UmountDisk(path string) error {
return os.NewSyscallError("umount", err)
}

// ForceUmountWithTimeout umounts the specific device with timeout to the specified path
func ForceUmountWithTimeout(path string, timeout time.Duration) error {
isHostProcMounted, err := IsHostProcMounted()
if err != nil {
return err
}
if isHostProcMounted {
_, err := executeOnHostNamespaceWithTimeout("umount", []string{"-f", path}, timeout)
return err
}
// flags, MNT_FORCE -> 1
err = syscall.Unmount(path, 1)
return os.NewSyscallError("umount", err)
}

// mountExt4 mount the ext4 volume device to the specified path with readonly option
func mountExt4(device, path string, readonly bool) error {
var flags uintptr
flags = syscall.MS_RELATIME
Expand Down Expand Up @@ -148,6 +172,8 @@ func mountExt4OnHostNamespace(device, path string, readonly bool) error {
return err
}

// executeOnHostNamespace executes the command in the host namespace
// return the command result and error
func executeOnHostNamespace(cmd string, args []string) (string, error) {
ns := GetHostNamespacePath(util.HostProcPath)
executor, err := NewExecutorWithNS(ns)
Expand All @@ -157,11 +183,25 @@ func executeOnHostNamespace(cmd string, args []string) (string, error) {
return executor.Execute(cmd, args)
}

// executeOnHostNamespace executes the command with timeout value in the host namespace
// return the command result and error
func executeOnHostNamespaceWithTimeout(cmd string, args []string, cmdTimeout time.Duration) (string, error) {
Vicente-Cheng marked this conversation as resolved.
Show resolved Hide resolved
ns := GetHostNamespacePath(util.HostProcPath)
executor, err := NewExecutorWithNS(ns)
executor.SetTimeout(cmdTimeout)
if err != nil {
return "", err
}
return executor.Execute(cmd, args)
}

// IsFSCorrupted checks if the error is caused by a corrupted filesystem
func IsFSCorrupted(err error) bool {
errMsg := err.Error()
return strings.Contains(errMsg, "wrong fs type")
}

// IsSupportedFileSystem checks if the filesystem type is supported
func IsSupportedFileSystem(fsType string) bool {
if fsType == "ext4" || fsType == "xfs" {
return true
Expand Down
39 changes: 39 additions & 0 deletions tests/integration/test_1_disk_hotplug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,45 @@ func (s *HotPlugTestSuite) Test_3_AddDuplicatedWWNDsik() {
require.Equal(s.T(), err, nil, "Get Blockdevices should not get error")
require.Equal(s.T(), s.curBusPath, curBlockdevice.Status.DeviceStatus.Details.BusPath, "Disk path should not replace by duplicated wwn disk")

// cleanup this disk
cmd = fmt.Sprintf("virsh detach-disk %s %s --live", hotplugTargetNodeName, "sdb")
_, _, err = doCommand(cmd)
require.Equal(s.T(), err, nil, "Running command `virsh detach-disk` should not get error")

// wait for controller handling
time.Sleep(5 * time.Second)
}

func (s *HotPlugTestSuite) Test_4_RemoveInactiveDisk() {
// remove disk dynamically
cmd := fmt.Sprintf("virsh detach-disk %s %s --live", hotplugTargetNodeName, hotplugTargetDiskName)
_, _, err := doCommand(cmd)
require.Equal(s.T(), err, nil, "Running command `virsh detach-disk` should not get error")

// wait for controller handling
time.Sleep(5 * time.Second)

// check disk status
require.NotEqual(s.T(), s.targetDiskName, "", "target disk name should not be empty before we start hotplug (remove) test")
bdi := s.clientSet.HarvesterhciV1beta1().BlockDevices("longhorn-system")
curBlockdevice, err := bdi.Get(context.TODO(), s.targetDiskName, v1.GetOptions{})
require.Equal(s.T(), nil, err, "Get Blockdevices should not get error")

require.Equal(s.T(), diskv1.BlockDeviceInactive, curBlockdevice.Status.State, "Disk status should be inactive after we remove disk")

// remove this inactive device from Harvester
newBlockdevice := curBlockdevice.DeepCopy()
newBlockdevice.Spec.FileSystem.Provisioned = false
bdi.Update(context.TODO(), newBlockdevice, v1.UpdateOptions{})

// sleep 30 seconds to wait controller handle. jitter is between 7~13 seconds so 30 seconds would be enough to run twice
time.Sleep(30 * time.Second)

// check for the removed status
curBlockdevice, err = bdi.Get(context.TODO(), s.targetDiskName, v1.GetOptions{})
require.Equal(s.T(), err, nil, "Get BlockdevicesList should not get error before we want to check remove")
require.Equal(s.T(), curBlockdevice.Status.DeviceStatus.FileSystem.MountPoint, "", "Mountpoint should be empty after we remove disk!")
require.Equal(s.T(), diskv1.ProvisionPhaseUnprovisioned, curBlockdevice.Status.ProvisionPhase, "Block device provisionPhase should be Provisioned")
}

func doCommand(cmdString string) (string, string, error) {
Expand Down
Loading