diff --git a/main.go b/main.go index 52ce79b..ac8d283 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "github.com/harvester/networkfs-manager/pkg/controller/endpoint" "github.com/harvester/networkfs-manager/pkg/controller/networkfilesystem" + "github.com/harvester/networkfs-manager/pkg/controller/service" "github.com/harvester/networkfs-manager/pkg/controller/sharemanager" ntefsv1 "github.com/harvester/networkfs-manager/pkg/generated/controllers/harvesterhci.io" ctrllonghorn "github.com/harvester/networkfs-manager/pkg/generated/controllers/longhorn.io" @@ -118,14 +119,19 @@ func run(opt *utils.Option) error { } endpoints := clientv1.Core().V1().Endpoints() + services := clientv1.Core().V1().Service() networkFilsystems := clientNetfs.Harvesterhci().V1beta1().NetworkFilesystem() sharemanagers := lhCtrlClient.Longhorn().V1beta2().ShareManager() cb := func(ctx context.Context) { - if err := endpoint.Register(ctx, endpoints, networkFilsystems, opt); err != nil { + if err := endpoint.Register(ctx, endpoints, networkFilsystems, services, opt); err != nil { logrus.Errorf("failed to register endpoint controller: %v", err) } + if err := service.Register(ctx, services, networkFilsystems, opt); err != nil { + logrus.Errorf("failed to register service controller: %v", err) + } + if err := networkfilesystem.Register(ctx, clientv1.Core().V1(), lhClient, endpoints, networkFilsystems, opt); err != nil { logrus.Errorf("failed to register networkfilesystem controller: %v", err) } diff --git a/pkg/controller/endpoint/controller.go b/pkg/controller/endpoint/controller.go index 43aaf13..71452d8 100644 --- a/pkg/controller/endpoint/controller.go +++ b/pkg/controller/endpoint/controller.go @@ -5,7 +5,7 @@ import ( "reflect" "strings" - ctlendpoint "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" + ctlcorev1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,10 +19,12 @@ type Controller struct { namespace string nodeName string - EndpointCache ctlendpoint.EndpointsCache - Endpoints ctlendpoint.EndpointsController + EndpointCache ctlcorev1.EndpointsCache + Endpoints ctlcorev1.EndpointsController NetworkFSCache ctlntefsv1.NetworkFilesystemCache NetworkFilsystems ctlntefsv1.NetworkFilesystemController + + serviceClient ctlcorev1.ServiceController } const ( @@ -30,7 +32,7 @@ const ( ) // Register register the longhorn node CRD controller -func Register(ctx context.Context, endpoint ctlendpoint.EndpointsController, netfilesystems ctlntefsv1.NetworkFilesystemController, opt *utils.Option) error { +func Register(ctx context.Context, endpoint ctlcorev1.EndpointsController, netfilesystems ctlntefsv1.NetworkFilesystemController, serviceClient ctlcorev1.ServiceController, opt *utils.Option) error { c := &Controller{ namespace: opt.Namespace, @@ -39,6 +41,7 @@ func Register(ctx context.Context, endpoint ctlendpoint.EndpointsController, net EndpointCache: endpoint.Cache(), NetworkFilsystems: netfilesystems, NetworkFSCache: netfilesystems.Cache(), + serviceClient: serviceClient, } c.Endpoints.OnChange(ctx, netFSEndpointHandlerName, c.OnEndpointChange) @@ -70,6 +73,17 @@ func (c *Controller) OnEndpointChange(_ string, endpoint *corev1.Endpoints) (*co return nil, nil } + // skip update if the service.Spec.ClusterIP is not ClusterIPNone (means the we depends on service) + service, err := c.serviceClient.Get(utils.LHNameSpace, endpoint.Name, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("Failed to get service %s: %v", endpoint.Name, err) + return nil, err + } + if service.Spec.ClusterIP != corev1.ClusterIPNone { + logrus.Infof("Skip update with endpoint change event because service %s is not ClusterIPNone", service.Name) + return nil, nil + } + networkFSCpy := networkFS.DeepCopy() if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 { networkFSCpy.Status.Endpoint = "" diff --git a/pkg/controller/networkfilesystem/controller.go b/pkg/controller/networkfilesystem/controller.go index 4b9c881..71d166e 100644 --- a/pkg/controller/networkfilesystem/controller.go +++ b/pkg/controller/networkfilesystem/controller.go @@ -52,7 +52,7 @@ func Register(ctx context.Context, coreClient ctlv1.Interface, lhClient *lhclien func (c *Controller) OnNetworkFSChange(_ string, networkFS *networkfsv1.NetworkFilesystem) (*networkfsv1.NetworkFilesystem, error) { if networkFS == nil || networkFS.DeletionTimestamp != nil { - logrus.Infof("Skip this round because the network filesystem %s is deleting", networkFS.Name) + logrus.Infof("Skip this round because the network filesystem is deleting") return nil, nil } logrus.Infof("Handling network filesystem %s change event", networkFS.Name) @@ -62,6 +62,19 @@ func (c *Controller) OnNetworkFSChange(_ string, networkFS *networkfsv1.NetworkF return nil, nil } + if networkFS.Status.State == "" { + // means empty Status, init first + logrus.Infof("Init network filesystem %s status", networkFS.Name) + networkFSCpy := networkFS.DeepCopy() + status := networkfsv1.NetworkFSStatus{ + State: networkfsv1.NetworkFSStateUnknown, + Status: networkfsv1.EndpointStatusUnknown, + Type: networkfsv1.NetworkFSTypeNFS, + } + networkFSCpy.Status = status + return c.NetworkFilsystems.UpdateStatus(networkFSCpy) + } + // Disabled -> Enabling -> Enabled -> Disabling -> Disabled switch networkFS.Spec.DesiredState { case networkfsv1.NetworkFSStateEnabled: @@ -94,13 +107,19 @@ func (c *Controller) disableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) (*networkfsv1.NetworkFilesystem, error) { logrus.Infof("Enable network filesystem %s", networkFS.Name) - // check endpoint status first - endpoint, err := c.endpointsClient.Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { - logrus.Errorf("Failed to get endpoint %s: %v", networkFS.Name, err) + // After update the LH volume attachment, we need to wait LH share manager provision. + lhShareMgr, err := c.lhClient.LonghornV1beta2().ShareManagers(utils.LHNameSpace).Get(context.Background(), networkFS.Name, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("Failed to get share manager %s: %v", networkFS.Name, err) return nil, err } - if !isEnabling(networkFS) || errors.IsNotFound(err) || endpoint.Subsets == nil || len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 { + + if !isEnabling(networkFS) && lhShareMgr.Status.State == longhornv2.ShareManagerStateStopped { + // enable the network filesystem need to wait the previous operation (disable) to finish + if networkFS.Status.State != networkfsv1.NetworkFSStateDisabled { + logrus.Infof("Wait the previous operation (disable) to finish") + return nil, fmt.Errorf("wait the previous operation (disable) to finish on %v", networkFS.Name) + } logrus.Infof("Endpoint %s is not ready, update lhVA to trigger export endpoint", networkFS.Name) if err := c.updateLHVolumeAttachment(networkFS, true); err != nil { return nil, err @@ -114,12 +133,42 @@ func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) ( } } + if lhShareMgr.Status.State != longhornv2.ShareManagerStateRunning { + logrus.Infof("Wait the share manager %s to be running", networkFS.Name) + return nil, fmt.Errorf("wait the share manager %s to be running", networkFS.Name) + } + // LH RWX volume endpoint should only have one address and one port - if len(endpoint.Subsets) > 1 || len(endpoint.Subsets[0].Addresses) > 1 || len(endpoint.Subsets[0].Ports) > 1 { - return nil, fmt.Errorf("endpoint %s has more than one subSets", networkFS.Name) + service, err := c.coreClient.Service().Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("Failed to get service %s: %v", networkFS.Name, err) + return nil, err } - if endpoint.Subsets[0].Ports[0].Name != "nfs" { - return nil, fmt.Errorf("endpoint %s has no nfs port", networkFS.Name) + networkFSCpy := networkFS.DeepCopy() + if service.Spec.ClusterIP != corev1.ClusterIPNone { + // means we depends on the service + if service.Spec.ClusterIP == "" { + // first init, service controller will update + logrus.Infof("Skip update on networkfs controller with the first time service init") + return nil, nil + } + networkFSCpy.Status.Endpoint = service.Spec.ClusterIP + } else { + endpoint, err := c.endpointsClient.Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + logrus.Errorf("Failed to get endpoint %s: %v", networkFS.Name, err) + } + if len(endpoint.Subsets) == 0 { + logrus.Infof("Endpoint %s has no subsets (not ready), skip this round!", networkFS.Name) + return nil, nil + } + if len(endpoint.Subsets) > 1 || len(endpoint.Subsets[0].Addresses) > 1 || len(endpoint.Subsets[0].Ports) > 1 { + return nil, fmt.Errorf("endpoint %s has more than one subSets", networkFS.Name) + } + if endpoint.Subsets[0].Ports[0].Name != "nfs" { + return nil, fmt.Errorf("endpoint %s has no nfs port", networkFS.Name) + } + networkFSCpy.Status.Endpoint = endpoint.Subsets[0].Addresses[0].IP } pv, err := c.coreClient.PersistentVolume().Get(networkFS.Name, metav1.GetOptions{}) @@ -132,8 +181,6 @@ func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) ( opts = pv.Spec.CSI.VolumeAttributes["nfsOptions"] } // update network filesystem status - networkFSCpy := networkFS.DeepCopy() - networkFSCpy.Status.Endpoint = endpoint.Subsets[0].Addresses[0].IP networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabled networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS networkFSCpy.Status.Status = networkfsv1.EndpointStatusReady diff --git a/pkg/controller/service/controller.go b/pkg/controller/service/controller.go new file mode 100644 index 0000000..5007040 --- /dev/null +++ b/pkg/controller/service/controller.go @@ -0,0 +1,129 @@ +package service + +import ( + "context" + "reflect" + "strings" + + ctlcorev1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + networkfsv1 "github.com/harvester/networkfs-manager/pkg/apis/harvesterhci.io/v1beta1" + ctlntefsv1 "github.com/harvester/networkfs-manager/pkg/generated/controllers/harvesterhci.io/v1beta1" + "github.com/harvester/networkfs-manager/pkg/utils" +) + +type Controller struct { + namespace string + nodeName string + + ServiceCache ctlcorev1.ServiceCache + Services ctlcorev1.ServiceController + NetworkFSCache ctlntefsv1.NetworkFilesystemCache + NetworkFilsystems ctlntefsv1.NetworkFilesystemController +} + +const ( + netFSEndpointHandlerName = "harvester-netfs-service-handler" +) + +// Register register the longhorn node CRD controller +func Register(ctx context.Context, services ctlcorev1.ServiceController, netfilesystems ctlntefsv1.NetworkFilesystemController, opt *utils.Option) error { + + c := &Controller{ + namespace: opt.Namespace, + nodeName: opt.NodeName, + Services: services, + ServiceCache: services.Cache(), + NetworkFilsystems: netfilesystems, + NetworkFSCache: netfilesystems.Cache(), + } + + c.Services.OnChange(ctx, netFSEndpointHandlerName, c.OnServicesChange) + return nil +} + +// OnServicesChange watch the services CR on change and sync up to networkfilesystem CR +func (c *Controller) OnServicesChange(_ string, service *corev1.Service) (*corev1.Service, error) { + if service == nil || service.DeletionTimestamp != nil { + logrus.Infof("Skip this round because service is deleted or deleting") + return nil, nil + } + + // we only care about the endpoint with name prefix "pvc-" + if !strings.HasPrefix(service.Name, "pvc-") { + return nil, nil + } + + logrus.Infof("Handling service %s change event", service.Name) + networkFS, err := c.NetworkFilsystems.Get(c.namespace, service.Name, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("Failed to get networkFS %s: %v", service.Name, err) + return nil, err + } + + // only update when the networkfilesystem is enabled. + if networkFS.Spec.DesiredState != networkfsv1.NetworkFSStateEnabled { + logrus.Infof("Skip update with endpoint change event because networkfilesystem %s is not enabled", networkFS.Name) + return nil, nil + } + + // means we depends on the endpoint, skip the update + if service.Spec.ClusterIP == corev1.ClusterIPNone { + return nil, nil + } + networkFSCpy := networkFS.DeepCopy() + // Not updated, skip the update (we will get a new service CR later) + if service.Spec.ClusterIP == "" { + networkFSCpy.Status.Endpoint = "" + networkFSCpy.Status.Status = networkfsv1.EndpointStatusNotReady + networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS + networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabling + conds := networkfsv1.NetworkFSCondition{ + Type: networkfsv1.ConditionTypeNotReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Service is not ready", + Message: "Service did not contain the corresponding address", + } + networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds) + } else { + if networkFSCpy.Status.Endpoint != service.Spec.ClusterIP { + changedMsg := "Endpoint address is initialized with " + service.Spec.ClusterIP + if networkFSCpy.Status.Endpoint != "" { + changedMsg = "Endpoint address is changed, previous address is " + networkFSCpy.Status.Endpoint + } + conds := networkfsv1.NetworkFSCondition{ + Type: networkfsv1.ConditionTypeEndpointChanged, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Endpoint is changed", + Message: changedMsg, + } + networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds) + } + networkFSCpy.Status.Endpoint = service.Spec.ClusterIP + networkFSCpy.Status.Status = networkfsv1.EndpointStatusReady + networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS + networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabling + conds := networkfsv1.NetworkFSCondition{ + Type: networkfsv1.ConditionTypeReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Endpoint is ready", + Message: "Endpoint contains the corresponding address", + } + networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds) + } + + if !reflect.DeepEqual(networkFS, networkFSCpy) { + if _, err := c.NetworkFilsystems.UpdateStatus(networkFSCpy); err != nil { + logrus.Errorf("Failed to update networkFS %s: %v", networkFS.Name, err) + return nil, err + } + } + + return nil, nil +} diff --git a/pkg/controller/sharemanager/controller.go b/pkg/controller/sharemanager/controller.go index 70f3275..1ad818a 100644 --- a/pkg/controller/sharemanager/controller.go +++ b/pkg/controller/sharemanager/controller.go @@ -60,10 +60,14 @@ func (c *Controller) OnShareManagerChange(_ string, sharemanager *longhornv1.Sha logrus.Infof("Handling sharemanager %s change event", sharemanager.Name) networkFS, err := c.NetworkFilsystems.Get(c.namespace, sharemanager.Name, metav1.GetOptions{}) - if err != nil && apierrors.IsNotFound(err) { + if err != nil && !apierrors.IsNotFound(err) { logrus.Errorf("Failed to get networkFS %s: %v", sharemanager.Name, err) return nil, err } + if apierrors.IsNotFound(err) { + logrus.Infof("Skip update with sharemanager change event because there is no corresponding NetworkFS %v", sharemanager.Name) + return nil, nil + } // already disabled, return if networkFS.Status.State == networkfsv1.NetworkFSStateDisabled {