Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement volume health monitoring feature to detect abnormal volumes #819

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/directpv/node-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/consts"
Expand All @@ -33,7 +34,11 @@ import (
"k8s.io/klog/v2"
)

var metricsPort = consts.MetricsPort
var (
metricsPort = consts.MetricsPort
volumeHealthMonitorInterval = 10 * time.Minute
enableVolumeHealthMonitor bool
)

var nodeServerCmd = &cobra.Command{
Use: consts.NodeServerName,
Expand All @@ -56,6 +61,8 @@ var nodeServerCmd = &cobra.Command{

func init() {
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "Interval for volume health monitoring in duration. Example: '20m','1h'")
}

func startNodeServer(ctx context.Context) error {
Expand Down Expand Up @@ -114,6 +121,15 @@ func startNodeServer(ctx context.Context) error {
}
}()

if enableVolumeHealthMonitor {
go func() {
if err := volume.RunHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
klog.ErrorS(err, "unable to run volume health monitor")
errCh <- err
}
}()
}

return <-errCh
}

Expand Down
40 changes: 23 additions & 17 deletions cmd/kubectl-directpv/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ import (
)

var (
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
enableVolumeHealthMonitor bool
)

var installCmd = &cobra.Command{
Expand Down Expand Up @@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv

7. Install DirectPV with seccomp profile
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json

8. Install DirectPV with volume health monitoring enabled
$ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
`{PLUGIN_NAME}`,
consts.AppName,
),
Expand Down Expand Up @@ -123,6 +127,7 @@ func init() {
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
installCmd.PersistentFlags().MarkHidden("declarative")
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
}

func validateNodeSelectorArgs() error {
Expand Down Expand Up @@ -305,8 +310,9 @@ func installMain(ctx context.Context) {
}
}
}
args.Declarative = declarativeFlag
args.Openshift = openshiftFlag
args.Declarative = declarativeFlag
args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor

var failed bool
var installedComponents []installer.Component
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubectl-directpv/list_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ func listVolumesMain(ctx context.Context) {
status = "Released"
case volume.IsDriveLost():
status = "Lost"
case volume.HasError():
status = "Error"
case volume.IsPublished():
status = "Bounded"
}
Expand Down
30 changes: 17 additions & 13 deletions docs/command-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,20 @@ USAGE:
directpv install [flags]

FLAGS:
--node-selector strings Select the storage nodes using labels (KEY=VALUE,..)
--tolerations strings Set toleration labels on the storage nodes (KEY[=VALUE]:EFFECT,..)
--registry string Name of container registry (default "quay.io")
--org string Organization name in the registry (default "minio")
--image string Name of the DirectPV image (default "directpv:v4.0.6")
--image-pull-secrets strings Image pull secrets for DirectPV images (SECRET1,..)
--apparmor-profile string Set path to Apparmor profile
--seccomp-profile string Set path to Seccomp profile
-o, --output string Generate installation manifest. One of: yaml|json
--kube-version string Select the kubernetes version for manifest generation (default "1.27.0")
--legacy Enable legacy mode (Used with '-o')
--openshift Use OpenShift specific installation
-h, --help help for install
--node-selector strings Select the storage nodes using labels (KEY=VALUE,..)
--tolerations strings Set toleration labels on the storage nodes (KEY[=VALUE]:EFFECT,..)
--registry string Name of container registry (default "quay.io")
--org string Organization name in the registry (default "minio")
--image string Name of the DirectPV image (default "directpv:0.0.0-dev")
--image-pull-secrets strings Image pull secrets for DirectPV images (SECRET1,..)
--apparmor-profile string Set path to Apparmor profile
--seccomp-profile string Set path to Seccomp profile
-o, --output string Generate installation manifest. One of: yaml|json
--kube-version string Select the kubernetes version for manifest generation (default "1.27.0")
--legacy Enable legacy mode (Used with '-o')
--openshift Use OpenShift specific installation
--enable-volume-health-monitoring Enable volume health monitoring
-h, --help help for install

GLOBAL FLAGS:
--kubeconfig string Path to the kubeconfig file to use for CLI requests
Expand All @@ -92,6 +93,9 @@ EXAMPLES:

7. Install DirectPV with seccomp profile
$ kubectl directpv install --seccomp-profile profiles/seccomp.json

8. Install DirectPV with volume health monitoring enabled
$ kubectl directpv install --enable-volume-health-monitoring
```

## `discover` command
Expand Down
12 changes: 12 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,15 @@ scrape_configs:
action: replace
target_label: kubernetes_name
```

# Monitoring volume health

DirectPV monitors volume health and reports abnormalities as event to PVCs and Pods. Currently volume mounts are monitored and more checks will be added later. This feature uses [Volume health monitoring CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/).

To enable volume health monitoring, pass `--enable-volume-health-monitoring` flag to installation and `CSIVolumeHealth` feature gate must be enabled.

For private registries, please note that the following image is required for enabling volume health monitoring

```
quay.io/minio/csi-external-health-monitor-controller:v0.10.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@Praveenrajmani Praveenrajmani Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volume health monitoring is not enabled by default, so, for a fresh installation we don't need this image. Users need this image only when they enable volume health monitoring

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users make common mistakes to miss pulling this image when enabling it. There is no problem to have unused image of csi-external-health-monitor-controller. When health monitoring is enabled, it works seamlessly than failures.

```
11 changes: 8 additions & 3 deletions pkg/apis/directpv.min.io/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,28 @@ type VolumeConditionType string

// Enum value of VolumeConditionType type.
const (
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeError VolumeConditionType = "Error"
)

// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
type VolumeConditionReason string

// Enum values of VolumeConditionReason type.
const (
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
VolumeConditionReasonNoError VolumeConditionReason = "NoError"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd. Can't we infer healthy status based on other conditions where those conditions not having metav1.ConditionTrue?

Copy link
Collaborator Author

@Praveenrajmani Praveenrajmani Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason in a condition cannot be empty. So, "normal" volumes will have the reason "NoError" in its condition.

Copy link
Collaborator Author

@Praveenrajmani Praveenrajmani Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason in a condition cannot be empty

For example,

> kubectl get nodes -o yaml | grep reason
      reason: KubeletHasSufficientMemory
      reason: KubeletHasNoDiskPressure
      reason: KubeletHasSufficientPID
      reason: KubeletReady

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kubernetes uses positive reasons in conditions, but we use negative reasons. It looks odd for Error type having NoError reason. Etiher you could deal with Status == metav1.ConditionTrue as enabled condition or have positive reasons.

)

// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
type VolumeConditionMessage string

// Enum values of VolumeConditionMessage type.
const (
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
)

// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.
Expand Down
41 changes: 41 additions & 0 deletions pkg/apis/directpv.min.io/v1beta1/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package v1beta1
import (
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -123,6 +125,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
return false
}

// HasError returns if the volume is in error state.
func (volume DirectPVVolume) HasError() bool {
condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
return condition != nil && condition.Status == metav1.ConditionTrue
}

// SetDriveLost sets associated drive is lost.
func (volume *DirectPVVolume) SetDriveLost() {
c := metav1.Condition{
Expand Down Expand Up @@ -316,6 +324,39 @@ func (volume *DirectPVVolume) Resume() bool {
return volume.RemoveLabel(types.SuspendLabelKey)
}

// ResetStageMountErrorCondition resets the stage volume mount error condition.
func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageStagingPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// ResetTargetMountErrorCondition resets the target volume mount error condition.
func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageTargetPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// GetCSIVolumeCondition returns the CSI volume condition.
func (volume *DirectPVVolume) GetCSIVolumeCondition() *csi.VolumeCondition {
var isAbnormal bool
var message string
errorCondition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
if errorCondition != nil && errorCondition.Status == metav1.ConditionTrue {
isAbnormal = true
message = errorCondition.Message
}
return &csi.VolumeCondition{
Abnormal: isAbnormal,
Message: message,
}
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DirectPVVolumeList denotes list of volumes.
Expand Down
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ const (

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "10m"
)
3 changes: 3 additions & 0 deletions pkg/consts/consts.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ const (

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "10m"
)
82 changes: 78 additions & 4 deletions pkg/csi/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
},
},
},
}, nil
}
Expand Down Expand Up @@ -359,8 +374,52 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller

// ListVolumes implements ListVolumes controller RPC
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
Limit: int64(req.GetMaxEntries()),
Continue: req.GetStartingToken(),
})
if err != nil {
if req.GetStartingToken() != "" {
return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
}
return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
}
var volumeEntries []*csi.ListVolumesResponse_Entry
for _, volume := range result.Items {
csiVolume, err := getCSIVolume(ctx, &volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
Volume: csiVolume,
Status: &csi.ListVolumesResponse_VolumeStatus{
VolumeCondition: volume.GetCSIVolumeCondition(),
},
})
}
return &csi.ListVolumesResponse{
Entries: volumeEntries,
NextToken: result.Continue,
}, nil
}

func getCSIVolume(ctx context.Context, volume *types.Volume) (*csi.Volume, error) {
drive, err := client.DriveClient().Get(ctx, string(volume.GetDriveID()), metav1.GetOptions{
TypeMeta: types.NewDriveTypeMeta(),
})
if err != nil {
return nil, err
}
return &csi.Volume{
CapacityBytes: volume.Status.TotalCapacity,
VolumeId: volume.Name,
AccessibleTopology: []*csi.Topology{
{
Segments: drive.Status.Topology,
},
},
}, nil
}

// ControllerPublishVolume - controller RPC to publish volumes
Expand All @@ -377,8 +436,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU

// ControllerGetVolume - controller RPC for get volume
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
volume, err := client.VolumeClient().Get(
ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
csiVolume, err := getCSIVolume(ctx, volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.ControllerGetVolumeResponse{
Volume: csiVolume,
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
VolumeCondition: volume.GetCSIVolumeCondition(),
},
}, nil
}

// ListSnapshots - controller RPC for listing snapshots
Expand Down
Loading