From c1c0ae869cfe76027cf3c7dfeb76d5c65078a1ac Mon Sep 17 00:00:00 2001 From: Vicente Cheng Date: Thu, 4 Jul 2024 10:52:08 +0800 Subject: [PATCH] controller: support RWX volume Signed-off-by: Vicente Cheng --- pkg/csi/controller_server.go | 299 ++++++++++++++++++++++++++++------- pkg/csi/manager.go | 10 +- pkg/csi/node_server.go | 184 ++++++++++++++++++--- pkg/csi/util.go | 1 + 4 files changed, 414 insertions(+), 80 deletions(-) diff --git a/pkg/csi/controller_server.go b/pkg/csi/controller_server.go index 8bb84b3d..4c4b42de 100644 --- a/pkg/csi/controller_server.go +++ b/pkg/csi/controller_server.go @@ -7,6 +7,8 @@ import ( "time" "github.com/container-storage-interface/spec/lib/go/csi" + networkfsv1 "github.com/harvester/networkfs-manager/pkg/apis/harvesterhci.io/v1beta1" + harvnetworkfsset "github.com/harvester/networkfs-manager/pkg/generated/clientset/versioned" lhv1beta2 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" ctlv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" @@ -31,22 +33,24 @@ const ( tickAttachDetach = 2 * time.Second paramHostSC = "hostStorageClass" LHVolumeNS = "longhorn-system" + HARVNS = "harvester-system" ) type ControllerServer struct { namespace string hostStorageClass string - coreClient ctlv1.Interface - storageClient ctlstoragev1.Interface - virtClient kubecli.KubevirtClient - lhClient *lhclientset.Clientset + coreClient ctlv1.Interface + storageClient ctlstoragev1.Interface + virtClient kubecli.KubevirtClient + lhClient *lhclientset.Clientset + harvNetFSClient *harvnetworkfsset.Clientset caps []*csi.ControllerServiceCapability accessModes []*csi.VolumeCapability_AccessMode } -func NewControllerServer(coreClient ctlv1.Interface, storageClient ctlstoragev1.Interface, virtClient kubecli.KubevirtClient, lhClient *lhclientset.Clientset, namespace string, hostStorageClass string) *ControllerServer { +func NewControllerServer(coreClient ctlv1.Interface, storageClient ctlstoragev1.Interface, virtClient kubecli.KubevirtClient, lhClient *lhclientset.Clientset, harvNetFSClient *harvnetworkfsset.Clientset, namespace string, hostStorageClass string) *ControllerServer { return &ControllerServer{ namespace: namespace, hostStorageClass: hostStorageClass, @@ -54,6 +58,7 @@ func NewControllerServer(coreClient ctlv1.Interface, storageClient ctlstoragev1. storageClient: storageClient, virtClient: virtClient, lhClient: lhClient, + harvNetFSClient: harvNetFSClient, caps: getControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, @@ -64,6 +69,7 @@ func NewControllerServer(coreClient ctlv1.Interface, storageClient ctlstoragev1. accessModes: getVolumeCapabilityAccessModes( []csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, }), } } @@ -95,7 +101,6 @@ func (cs *ControllerServer) validStorageClass(storageClassName string) error { func (cs *ControllerServer) CreateVolume(_ context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { logrus.Infof("ControllerServer create volume req: %v", req) - targetSC := cs.hostStorageClass if err := cs.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { logrus.Errorf("CreateVolume: invalid create volume req: %v", req) @@ -107,6 +112,7 @@ func (cs *ControllerServer) CreateVolume(_ context.Context, req *csi.CreateVolum return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty") } volumeCaps := req.GetVolumeCapabilities() + logrus.Debugf("Getting volumeCapabilities: %+v", volumeCaps) if err := cs.validateVolumeCapabilities(volumeCaps); err != nil { return nil, err } @@ -123,39 +129,6 @@ func (cs *ControllerServer) CreateVolume(_ context.Context, req *csi.CreateVolum return nil, status.Error(codes.Unimplemented, "") } - // Create a PVC from the host cluster - volumeMode := corev1.PersistentVolumeBlock - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: cs.namespace, - Name: req.Name, - }, - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, - VolumeMode: &volumeMode, - }, - } - - /* - * Let's handle SC - * SC define > controller define - * - * Checking mechanism: - * 1. If guest cluster is not be allowed to list host cluster StorageClass, just continue. - * 2. If guest cluster can list host cluster StorageClass, check it. - */ - if val, exists := volumeParameters[paramHostSC]; exists { - //If StorageClass has `hostStorageClass` parameter, check whether it is valid or not. - if err := cs.validStorageClass(val); err != nil { - return nil, err - } - targetSC = val - } - if targetSC != "" { - pvc.Spec.StorageClassName = pointer.StringPtr(targetSC) - logrus.Infof("Set up the target StorageClass to : %s", *pvc.Spec.StorageClassName) - } - logrus.Debugf("The PVC content wanted is: %+v", pvc) volSizeBytes := int64(utils.MinimalVolumeSize) if req.GetCapacityRange() != nil { volSizeBytes = req.GetCapacityRange().GetRequiredBytes() @@ -164,19 +137,46 @@ func (cs *ControllerServer) CreateVolume(_ context.Context, req *csi.CreateVolum logrus.Warnf("Request volume %v size %v is smaller than minimal size %v, set it to minimal size.", req.Name, volSizeBytes, utils.MinimalVolumeSize) volSizeBytes = utils.MinimalVolumeSize } - // Round up to multiple of 2 * 1024 * 1024 - volSizeBytes = utils.RoundUpSize(volSizeBytes) - pvc.Spec.Resources = corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceStorage: *resource.NewQuantity(volSizeBytes, resource.BinarySI), - }, + + // Create a PVC from the host cluster + pvc, err := cs.generateHostClusterPVCFormat(req.Name, volumeCaps, volumeParameters, volSizeBytes) + if err != nil { + return nil, err } + logrus.Debugf("The PVC content wanted is: %+v", pvc) resPVC, err := cs.coreClient.PersistentVolumeClaim().Create(pvc) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + if isLHRWXVolume(resPVC) { + if !cs.waitForLHVolumeName(resPVC.Name) { + return nil, status.Errorf(codes.DeadlineExceeded, "Failed to create volume %s", resPVC.Name) + } + + resPVC, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, resPVC.Name, metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get PVC %s: %v", resPVC.Name, err) + } + + // that means the longhorn RWX volume (NFS), we need to create networkfilesystem CRD + networkfilesystem := &networkfsv1.NetworkFilesystem{ + ObjectMeta: metav1.ObjectMeta{ + Name: resPVC.Spec.VolumeName, + Namespace: HARVNS, + }, + Spec: networkfsv1.NetworkFSSpec{ + NetworkFSName: resPVC.Spec.VolumeName, + DesiredState: networkfsv1.NetworkFSStateDisabled, + Provisioner: LHName, + }, + } + if _, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Create(context.TODO(), networkfilesystem, metav1.CreateOptions{}); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: resPVC.Name, @@ -197,6 +197,25 @@ func (cs *ControllerServer) DeleteVolume(_ context.Context, req *csi.DeleteVolum return nil, status.Errorf(codes.Internal, "Invalid delete volume req: %v", err) } + resPVC, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, req.GetVolumeId(), metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get PVC %s: %v", req.GetVolumeId(), err) + } + if errors.IsNotFound(err) { + return &csi.DeleteVolumeResponse{}, nil + } + + if isLHRWXVolume(resPVC) { + // do no-op if the networkfilesystem is already deleted + if _, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Get(context.TODO(), resPVC.Spec.VolumeName, metav1.GetOptions{}); err != nil && !errors.IsNotFound(err) { + return nil, status.Errorf(codes.Internal, "Failed to get NetworkFileSystem %s: %v", resPVC.Spec.VolumeName, err) + } else if err == nil { + if err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Delete(context.TODO(), resPVC.Spec.VolumeName, metav1.DeleteOptions{}); err != nil { + return nil, status.Errorf(codes.Internal, "Failed to delete NetworkFileSystem %s: %v", resPVC.Spec.VolumeName, err) + } + } + } + if err := cs.coreClient.PersistentVolumeClaim().Delete(cs.namespace, req.GetVolumeId(), &metav1.DeleteOptions{}); errors.IsNotFound(err) { return &csi.DeleteVolumeResponse{}, nil } else if err != nil { @@ -261,6 +280,15 @@ func (cs *ControllerServer) ControllerPublishVolume(_ context.Context, req *csi. return nil, status.Errorf(codes.Aborted, "The PVC %s in phase %v is not ready to be attached", req.GetVolumeId(), pvc.Status.Phase) } + + // do no-op here with RWX volume + if volumeCapability.AccessMode.GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { + if isLHRWXVolume(pvc) { + return cs.publishRWXVolume(pvc) + } + logrus.Info("Do no-op for non-LH RWX volume") + return &csi.ControllerPublishVolumeResponse{}, nil + } lhVolumeName := pvc.Spec.VolumeName // we should wait for the volume to be detached from the previous node @@ -323,6 +351,9 @@ func (cs *ControllerServer) ControllerUnpublishVolume(_ context.Context, req *cs req.GetVolumeId(), pvc.Status.Phase) } + if isLHRWXVolume(pvc) { + return cs.unpublishRWXVolume(pvc) + } volumeHotplugged := false vmi, err := cs.virtClient.VirtualMachineInstance(cs.namespace).Get(context.TODO(), req.GetNodeId(), &metav1.GetOptions{}) if err != nil { @@ -374,6 +405,10 @@ func (cs *ControllerServer) ListSnapshots(context.Context, *csi.ListSnapshotsReq return nil, status.Error(codes.Unimplemented, "") } +func (cs *ControllerServer) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + func (cs *ControllerServer) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { pvc, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, req.GetVolumeId(), metav1.GetOptions{}) if err != nil { @@ -428,6 +463,46 @@ func (cs *ControllerServer) ControllerExpandVolume(_ context.Context, req *csi.C }, nil } +func (cs *ControllerServer) publishRWXVolume(pvc *corev1.PersistentVolumeClaim) (*csi.ControllerPublishVolumeResponse, error) { + networkfs, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to delete NetworkFileSystem %s: %v", pvc.Spec.VolumeName, err) + } + + if networkfs.Spec.DesiredState == networkfsv1.NetworkFSStateEnabled || networkfs.Status.State == networkfsv1.NetworkFSStateEnabling { + // do nothing if the networkfilesystem is already enabled + return &csi.ControllerPublishVolumeResponse{}, nil + } + + networkfs.Spec.DesiredState = networkfsv1.NetworkFSStateEnabled + if _, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Update(context.TODO(), networkfs, metav1.UpdateOptions{}); err != nil { + return nil, status.Errorf(codes.Internal, "Failed to enable NetworkFileSystem %s: %v", pvc.Spec.VolumeName, err) + } + return &csi.ControllerPublishVolumeResponse{}, nil +} + +func (cs *ControllerServer) unpublishRWXVolume(pvc *corev1.PersistentVolumeClaim) (*csi.ControllerUnpublishVolumeResponse, error) { + networkfs, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to delete NetworkFileSystem %s: %v", pvc.Spec.VolumeName, err) + } + + if errors.IsNotFound(err) { + // do nothing if the networkfilesystem is already deleted + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + if networkfs.Spec.DesiredState == networkfsv1.NetworkFSStateDisabled || networkfs.Status.State == networkfsv1.NetworkFSStateDisabling { + // do nothing if the networkfilesystem is already disabled + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + + networkfs.Spec.DesiredState = networkfsv1.NetworkFSStateDisabled + if _, err := cs.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Update(context.TODO(), networkfs, metav1.UpdateOptions{}); err != nil { + return nil, status.Errorf(codes.Internal, "Failed to disable NetworkFileSystem %s: %v", pvc.Spec.VolumeName, err) + } + return &csi.ControllerUnpublishVolumeResponse{}, nil +} + func (cs *ControllerServer) validateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error { if c == csi.ControllerServiceCapability_RPC_UNKNOWN { return nil @@ -469,6 +544,99 @@ func (cs *ControllerServer) validateVolumeCapabilities(volumeCaps []*csi.VolumeC return nil } +func (cs *ControllerServer) waitForPVCState(name string, stateDescription string, + predicate func(pvc *corev1.PersistentVolumeClaim) bool) bool { + timer := time.NewTimer(timeoutAttachDetach) + defer timer.Stop() + timeout := timer.C + + ticker := time.NewTicker(tickAttachDetach) + defer ticker.Stop() + tick := ticker.C + + for { + select { + case <-timeout: + logrus.Warnf("waitForPVCState: timeout while waiting for PVC %s state %s", name, stateDescription) + return false + case <-tick: + logrus.Debugf("Polling PVC %s state for %s at %s", name, stateDescription, time.Now().String()) + existVol, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, name, metav1.GetOptions{}) + if err != nil { + logrus.Warnf("waitForPVCState: error while waiting for PVC %s state %s error %s", name, stateDescription, err) + continue + } + if predicate(existVol) { + return true + } + } + } +} + +func (cs *ControllerServer) generateHostClusterPVCFormat(name string, volCaps []*csi.VolumeCapability, volumeParameters map[string]string, volSizeBytes int64) (*corev1.PersistentVolumeClaim, error) { + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: cs.namespace, + Name: name, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, + }, + } + + volumeMode := cs.getVolumeModule(volCaps) + targetSC, err := cs.getStorageClass(volumeParameters) + if err != nil { + logrus.Errorf("Failed to get the StorageClass: %v", err) + return nil, err + } + // Round up to multiple of 2 * 1024 * 1024 + volSizeBytes = utils.RoundUpSize(volSizeBytes) + + pvc.Spec.VolumeMode = &volumeMode + if targetSC != "" { + pvc.Spec.StorageClassName = pointer.StringPtr(targetSC) + } + pvc.Spec.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: *resource.NewQuantity(volSizeBytes, resource.BinarySI), + }, + } + return pvc, nil +} + +func (cs *ControllerServer) getVolumeModule(volCaps []*csi.VolumeCapability) corev1.PersistentVolumeMode { + for _, volCap := range volCaps { + if volCap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { + return corev1.PersistentVolumeFilesystem + } + if volCap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER { + return corev1.PersistentVolumeBlock + } + } + return "" +} + +func (cs *ControllerServer) getStorageClass(volParameters map[string]string) (string, error) { + /* + * Let's handle SC + * SC define > controller define + * + * Checking mechanism: + * 1. If guest cluster is not be allowed to list host cluster StorageClass, just continue. + * 2. If guest cluster can list host cluster StorageClass, check it. + */ + targetSC := cs.hostStorageClass + if val, exists := volParameters[paramHostSC]; exists { + //If StorageClass has `hostStorageClass` parameter, check whether it is valid or not. + if err := cs.validStorageClass(val); err != nil { + return "", err + } + targetSC = val + } + return targetSC, nil +} + func getControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) []*csi.ControllerServiceCapability { var cscs = make([]*csi.ControllerServiceCapability, len(cl)) @@ -524,31 +692,31 @@ func volAttachedCorrectly(volume *lhv1beta2.Volume, nodeID string) bool { return false } -func (cs *ControllerServer) waitForPVCState(name string, stateDescription string, - predicate func(pvc *corev1.PersistentVolumeClaim) bool) bool { - timer := time.NewTimer(timeoutAttachDetach) - defer timer.Stop() - timeout := timer.C - +func (cs *ControllerServer) waitForLHVolumeName(pvcName string) bool { + timeoutTimer := time.NewTimer(timeoutAttachDetach) + defer timeoutTimer.Stop() ticker := time.NewTicker(tickAttachDetach) defer ticker.Stop() + + timeout := timeoutTimer.C tick := ticker.C for { select { case <-timeout: - logrus.Warnf("waitForPVCState: timeout while waiting for PVC %s state %s", name, stateDescription) + logrus.Warnf("waitForLHVolumeName: timeout while waiting for volume %s to be created", pvcName) return false case <-tick: - logrus.Debugf("Polling PVC %s state for %s at %s", name, stateDescription, time.Now().String()) - existVol, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, name, metav1.GetOptions{}) + resPVC, err := cs.coreClient.PersistentVolumeClaim().Get(cs.namespace, pvcName, metav1.GetOptions{}) if err != nil { - logrus.Warnf("waitForPVCState: error while waiting for PVC %s state %s error %s", name, stateDescription, err) - continue + logrus.Warnf("waitForLHVolumeName: error while waiting for volume %s to be created: %v", pvcName, err) + return false } - if predicate(existVol) { - return true + if resPVC.Spec.VolumeName == "" { + logrus.Infof("volumeName is not set for PVC %s, continue to wait", pvcName) + continue } + return true } } } @@ -562,3 +730,18 @@ func getVirtLauncherWorkloadsFromLHVolume(volume *lhv1beta2.Volume) []string { } return virtWorkloads } + +func isLHRWXVolume(pvc *corev1.PersistentVolumeClaim) bool { + if pvc.Spec.VolumeMode == nil || *pvc.Spec.VolumeMode != corev1.PersistentVolumeFilesystem { + return false + } + if len(pvc.Spec.AccessModes) == 0 { + return false + } + for _, mode := range pvc.Spec.AccessModes { + if mode == corev1.ReadWriteMany { + return true + } + } + return false +} diff --git a/pkg/csi/manager.go b/pkg/csi/manager.go index 9d5eb149..124397b9 100644 --- a/pkg/csi/manager.go +++ b/pkg/csi/manager.go @@ -7,6 +7,7 @@ import ( "os" "slices" + harvnetworkfsset "github.com/harvester/networkfs-manager/pkg/generated/clientset/versioned" lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" "github.com/rancher/wrangler/pkg/generated/controllers/core" "github.com/rancher/wrangler/pkg/generated/controllers/storage" @@ -80,6 +81,11 @@ func (m *Manager) Run(cfg *config.Config) error { return err } + harvNetworkFSClient, err := harvnetworkfsset.NewForConfig(rest.CopyConfig(restConfig)) + if err != nil { + return err + } + nodeID := cfg.NodeID ifaces, err := sysfsnet.Interfaces() @@ -109,8 +115,8 @@ func (m *Manager) Run(cfg *config.Config) error { } m.ids = NewIdentityServer(driverName, version.FriendlyVersion()) - m.ns = NewNodeServer(coreClient.Core().V1(), virtClient, nodeID, namespace) - m.cs = NewControllerServer(coreClient.Core().V1(), storageClient.Storage().V1(), virtClient, lhclient, namespace, cfg.HostStorageClass) + m.ns = NewNodeServer(coreClient.Core().V1(), virtClient, lhclient, harvNetworkFSClient, nodeID, namespace, restConfig.Host) + m.cs = NewControllerServer(coreClient.Core().V1(), storageClient.Storage().V1(), virtClient, lhclient, harvNetworkFSClient, namespace, cfg.HostStorageClass) // Create GRPC servers s := NewNonBlockingGRPCServer() diff --git a/pkg/csi/node_server.go b/pkg/csi/node_server.go index 06e763c8..1144ea5f 100644 --- a/pkg/csi/node_server.go +++ b/pkg/csi/node_server.go @@ -8,6 +8,11 @@ import ( "strings" "github.com/container-storage-interface/spec/lib/go/csi" + cmd "github.com/harvester/go-common/command" + common "github.com/harvester/go-common/common" + networkfsv1 "github.com/harvester/networkfs-manager/pkg/apis/harvesterhci.io/v1beta1" + harvnetworkfsset "github.com/harvester/networkfs-manager/pkg/generated/clientset/versioned" + lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" "github.com/pkg/errors" ctlv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" @@ -17,7 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/mount-utils" - utilexec "k8s.io/utils/exec" + "k8s.io/utils/exec" kubevirtv1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" ) @@ -25,14 +30,17 @@ import ( var hostUtil = hostutil.NewHostUtil() type NodeServer struct { - namespace string - coreClient ctlv1.Interface - virtClient kubecli.KubevirtClient - nodeID string - caps []*csi.NodeServiceCapability + namespace string + coreClient ctlv1.Interface + virtClient kubecli.KubevirtClient + nodeID string + caps []*csi.NodeServiceCapability + vip string + lhClient *lhclientset.Clientset + harvNetFSClient *harvnetworkfsset.Clientset } -func NewNodeServer(coreClient ctlv1.Interface, virtClient kubecli.KubevirtClient, nodeID string, namespace string) *NodeServer { +func NewNodeServer(coreClient ctlv1.Interface, virtClient kubecli.KubevirtClient, lhClient *lhclientset.Clientset, harvNetFSClient *harvnetworkfsset.Clientset, nodeID string, namespace, vip string) *NodeServer { return &NodeServer{ coreClient: coreClient, virtClient: virtClient, @@ -41,10 +49,114 @@ func NewNodeServer(coreClient ctlv1.Interface, virtClient kubecli.KubevirtClient caps: getNodeServiceCapabilities( []csi.NodeServiceCapability_RPC_Type{ csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, }), + vip: vip, + lhClient: lhClient, + harvNetFSClient: harvNetFSClient, } } +func (ns *NodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + volCaps := req.GetVolumeCapability() + if volCaps == nil { + return nil, status.Error(codes.InvalidArgument, "Missing volume capability in request") + } + + volAccessMode := volCaps.GetAccessMode().GetMode() + if volAccessMode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { + return ns.nodeStageRWXVolume(req) + } + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) nodeStageRWXVolume(req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + logrus.Infof("NodeStageVolume is called with req %+v", req) + + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + volumeCapability := req.GetVolumeCapability() + if volumeCapability == nil { + return nil, status.Error(codes.InvalidArgument, "volume capability missing in request") + } + + volName := req.GetVolumeId() + pvc, err := ns.coreClient.PersistentVolumeClaim().Get(ns.namespace, volName, metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get PVC %v: %v", volName, err) + } + lhVolName := pvc.Spec.VolumeName + networkfs, err := ns.harvNetFSClient.HarvesterhciV1beta1().NetworkFilesystems(HARVNS).Get(context.TODO(), lhVolName, metav1.GetOptions{}) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get NetworkFS %v: %v", lhVolName, err) + } + if networkfs.Status.Status != networkfsv1.EndpointStatusReady { + return nil, status.Errorf(codes.Internal, "NetworkFS %v is not ready", lhVolName) + } + // basically, we are using NFSv4, update the args + mountOpts := "vers=4" + if networkfs.Status.MountOpts != "" { + mountOpts = networkfs.Status.MountOpts + } + + volumeEndpoint := networkfs.Status.Endpoint + logrus.Debugf("volumeServerEndpoint: %s", volumeEndpoint) + export := fmt.Sprintf("%s:/%s", volumeEndpoint, lhVolName) + logrus.Debugf("full endpoint: %s", export) + args := []string{"-t", "nfs", "-o", mountOpts} + args = append(args, export, stagingTargetPath) + logrus.Debugf("target args: %v", args) + + // do mount + nspace := common.GetHostNamespacePath("/proc") + executor, err := cmd.NewExecutorWithNS(nspace) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not create executor: %v", err) + } + logrus.Infof("Mounting volume %s to %s", req.VolumeId, stagingTargetPath) + _, err = executor.Execute("mount", args) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not mount %v for global path: %v", export, err) + } + + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + nspace := common.GetHostNamespacePath("/proc") + executor, err := cmd.NewExecutorWithNS(nspace) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not create executor: %v", err) + } + + logrus.Infof("Unmounting volume %s from %s", req.VolumeId, stagingTargetPath) + out, err := executor.Execute("mountpoint", []string{stagingTargetPath}) + if err != nil { + if strings.Contains(err.Error(), "is not a mountpoint") { + logrus.Infof("Volume %s is not mounted at %s, return directly.", req.VolumeId, stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil + } + return nil, status.Errorf(codes.Internal, "Could not check mountpoint %v: %v", stagingTargetPath, err) + } + if !strings.Contains(out, "is a mountpoint") { + return &csi.NodeUnstageVolumeResponse{}, nil + } + + if _, err := executor.Execute("umount", []string{stagingTargetPath}); err != nil { + return nil, status.Errorf(codes.Internal, "Could not unmount %v: %v", stagingTargetPath, err) + } + + return &csi.NodeUnstageVolumeResponse{}, nil +} + // NodePublishVolume will mount the volume /dev/ to target_path func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { logrus.Infof("NodeServer NodePublishVolume req: %v", req) @@ -59,6 +171,46 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV return nil, status.Error(codes.InvalidArgument, "Missing volume capability in request") } + volAccessMode := volumeCapability.GetAccessMode().GetMode() + if volAccessMode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { + return ns.nodePublishRWXVolume(req, targetPath, volumeCapability) + } else if volAccessMode == csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER { + return ns.nodePublishRWOVolume(req, targetPath, volumeCapability) + } + return nil, status.Error(codes.InvalidArgument, "Invalid Access Mode, neither RWX nor RWO") +} + +func (ns *NodeServer) nodePublishRWXVolume(req *csi.NodePublishVolumeRequest, targetPath string, _ *csi.VolumeCapability) (*csi.NodePublishVolumeResponse, error) { + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "staging target path missing in request") + } + + // make sure the target path status (mounted, corrupted, not exist) + mounterInst := mount.New("") + mounted, err := mounterInst.IsMountPoint(targetPath) + if os.IsNotExist(err) { + if err := os.MkdirAll(targetPath, 0750); err != nil { + return nil, status.Errorf(codes.Internal, "Could not create target dir %s: %v", targetPath, err) + } + } + + // Already mounted, do nothing + if mounted { + return &csi.NodePublishVolumeResponse{}, nil + } + + logrus.Debugf("stagingTargetPath: %s, targetPath: %s", stagingTargetPath, targetPath) + mountOptions := []string{"bind"} + if err := mounterInst.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil { + return nil, status.Errorf(codes.Internal, "failed to bind mount volume %s to target path %s: %v", req.GetVolumeId(), targetPath, err) + } + + return &csi.NodePublishVolumeResponse{}, nil + +} + +func (ns *NodeServer) nodePublishRWOVolume(req *csi.NodePublishVolumeRequest, targetPath string, volCaps *csi.VolumeCapability) (*csi.NodePublishVolumeResponse, error) { vmi, err := ns.virtClient.VirtualMachineInstance(ns.namespace).Get(context.TODO(), ns.nodeID, &metav1.GetOptions{}) if err != nil { return nil, status.Errorf(codes.Internal, "Failed to get VMI %v: %v", ns.nodeID, err) @@ -82,18 +234,18 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV return nil, status.Errorf(codes.Internal, "Failed to get device path for volume %v: %v", req.VolumeId, err) } - mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()} - if volumeCapability.GetBlock() != nil { + mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: exec.New()} + if volCaps.GetBlock() != nil { return ns.nodePublishBlockVolume(req.GetVolumeId(), devicePath, targetPath, mounter) - } else if volumeCapability.GetMount() != nil { + } else if volCaps.GetMount() != nil { // mounter assumes ext4 by default - fsType := volumeCapability.GetMount().GetFsType() + fsType := volCaps.GetMount().GetFsType() if fsType == "" { fsType = "ext4" } return ns.nodePublishMountVolume(req.GetVolumeId(), devicePath, targetPath, - fsType, volumeCapability.GetMount().GetMountFlags(), mounter) + fsType, volCaps.GetMount().GetMountFlags(), mounter) } return nil, status.Error(codes.InvalidArgument, "Invalid volume capability, neither Mount nor Block") } @@ -216,14 +368,6 @@ func (ns *NodeServer) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpubl return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *NodeServer) NodeStageVolume(context.Context, *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") -} - -func (ns *NodeServer) NodeUnstageVolume(context.Context, *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") -} - func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { if req.GetVolumeId() == "" { return nil, status.Error(codes.InvalidArgument, "Missing volume ID in request") diff --git a/pkg/csi/util.go b/pkg/csi/util.go index af8a48a3..ba6fea7d 100644 --- a/pkg/csi/util.go +++ b/pkg/csi/util.go @@ -9,6 +9,7 @@ import ( const ( deviceByIDDirectory = "/dev/disk/by-id/" driverName = "driver.harvesterhci.io" + LHName = "Longhorn" ) type volumeFilesystemStatistics struct {