Skip to content

Commit

Permalink
controller: misc update
Browse files Browse the repository at this point in the history
    - add new service controller
    - improve the networkfilesystem controller to handle
      service/endpoint well
    - improve the error handling
    - improve the status update with the first time init

Signed-off-by: Vicente Cheng <[email protected]>
  • Loading branch information
Vicente-Cheng committed Aug 27, 2024
1 parent f4e88ee commit 59f23e8
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 18 deletions.
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/harvester/networkfs-manager/pkg/controller/endpoint"
"github.com/harvester/networkfs-manager/pkg/controller/networkfilesystem"
"github.com/harvester/networkfs-manager/pkg/controller/service"
"github.com/harvester/networkfs-manager/pkg/controller/sharemanager"
ntefsv1 "github.com/harvester/networkfs-manager/pkg/generated/controllers/harvesterhci.io"
ctrllonghorn "github.com/harvester/networkfs-manager/pkg/generated/controllers/longhorn.io"
Expand Down Expand Up @@ -118,14 +119,19 @@ func run(opt *utils.Option) error {
}

endpoints := clientv1.Core().V1().Endpoints()
services := clientv1.Core().V1().Service()
networkFilsystems := clientNetfs.Harvesterhci().V1beta1().NetworkFilesystem()
sharemanagers := lhCtrlClient.Longhorn().V1beta2().ShareManager()

cb := func(ctx context.Context) {
if err := endpoint.Register(ctx, endpoints, networkFilsystems, opt); err != nil {
if err := endpoint.Register(ctx, endpoints, networkFilsystems, services, opt); err != nil {
logrus.Errorf("failed to register endpoint controller: %v", err)
}

if err := service.Register(ctx, services, networkFilsystems, opt); err != nil {
logrus.Errorf("failed to register service controller: %v", err)
}

if err := networkfilesystem.Register(ctx, clientv1.Core().V1(), lhClient, endpoints, networkFilsystems, opt); err != nil {
logrus.Errorf("failed to register networkfilesystem controller: %v", err)
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/controller/endpoint/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"reflect"
"strings"

ctlendpoint "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
ctlcorev1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,18 +19,20 @@ type Controller struct {
namespace string
nodeName string

EndpointCache ctlendpoint.EndpointsCache
Endpoints ctlendpoint.EndpointsController
EndpointCache ctlcorev1.EndpointsCache
Endpoints ctlcorev1.EndpointsController
NetworkFSCache ctlntefsv1.NetworkFilesystemCache
NetworkFilsystems ctlntefsv1.NetworkFilesystemController

serviceClient ctlcorev1.ServiceController
}

const (
netFSEndpointHandlerName = "harvester-netfs-endpoint-handler"
)

// Register register the longhorn node CRD controller
func Register(ctx context.Context, endpoint ctlendpoint.EndpointsController, netfilesystems ctlntefsv1.NetworkFilesystemController, opt *utils.Option) error {
func Register(ctx context.Context, endpoint ctlcorev1.EndpointsController, netfilesystems ctlntefsv1.NetworkFilesystemController, serviceClient ctlcorev1.ServiceController, opt *utils.Option) error {

c := &Controller{
namespace: opt.Namespace,
Expand All @@ -39,6 +41,7 @@ func Register(ctx context.Context, endpoint ctlendpoint.EndpointsController, net
EndpointCache: endpoint.Cache(),
NetworkFilsystems: netfilesystems,
NetworkFSCache: netfilesystems.Cache(),
serviceClient: serviceClient,
}

c.Endpoints.OnChange(ctx, netFSEndpointHandlerName, c.OnEndpointChange)
Expand Down Expand Up @@ -70,6 +73,17 @@ func (c *Controller) OnEndpointChange(_ string, endpoint *corev1.Endpoints) (*co
return nil, nil
}

// skip update if the service.Spec.ClusterIP is not ClusterIPNone (means the we depends on service)
service, err := c.serviceClient.Get(utils.LHNameSpace, endpoint.Name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get service %s: %v", endpoint.Name, err)
return nil, err
}
if service.Spec.ClusterIP != corev1.ClusterIPNone {
logrus.Infof("Skip update with endpoint change event because service %s is not ClusterIPNone", service.Name)
return nil, nil
}

networkFSCpy := networkFS.DeepCopy()
if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 {
networkFSCpy.Status.Endpoint = ""
Expand Down
71 changes: 59 additions & 12 deletions pkg/controller/networkfilesystem/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Register(ctx context.Context, coreClient ctlv1.Interface, lhClient *lhclien

func (c *Controller) OnNetworkFSChange(_ string, networkFS *networkfsv1.NetworkFilesystem) (*networkfsv1.NetworkFilesystem, error) {
if networkFS == nil || networkFS.DeletionTimestamp != nil {
logrus.Infof("Skip this round because the network filesystem %s is deleting", networkFS.Name)
logrus.Infof("Skip this round because the network filesystem is deleting")
return nil, nil
}
logrus.Infof("Handling network filesystem %s change event", networkFS.Name)
Expand All @@ -62,6 +62,19 @@ func (c *Controller) OnNetworkFSChange(_ string, networkFS *networkfsv1.NetworkF
return nil, nil
}

if networkFS.Status.State == "" {
// means empty Status, init first
logrus.Infof("Init network filesystem %s status", networkFS.Name)
networkFSCpy := networkFS.DeepCopy()
status := networkfsv1.NetworkFSStatus{
State: networkfsv1.NetworkFSStateUnknown,
Status: networkfsv1.EndpointStatusUnknown,
Type: networkfsv1.NetworkFSTypeNFS,
}
networkFSCpy.Status = status
return c.NetworkFilsystems.UpdateStatus(networkFSCpy)
}

// Disabled -> Enabling -> Enabled -> Disabling -> Disabled
switch networkFS.Spec.DesiredState {
case networkfsv1.NetworkFSStateEnabled:
Expand Down Expand Up @@ -94,13 +107,19 @@ func (c *Controller) disableNetworkFS(networkFS *networkfsv1.NetworkFilesystem)
func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) (*networkfsv1.NetworkFilesystem, error) {
logrus.Infof("Enable network filesystem %s", networkFS.Name)

// check endpoint status first
endpoint, err := c.endpointsClient.Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Errorf("Failed to get endpoint %s: %v", networkFS.Name, err)
// After update the LH volume attachment, we need to wait LH share manager provision.
lhShareMgr, err := c.lhClient.LonghornV1beta2().ShareManagers(utils.LHNameSpace).Get(context.Background(), networkFS.Name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get share manager %s: %v", networkFS.Name, err)
return nil, err
}
if !isEnabling(networkFS) || errors.IsNotFound(err) || endpoint.Subsets == nil || len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 {

if !isEnabling(networkFS) && lhShareMgr.Status.State == longhornv2.ShareManagerStateStopped {
// enable the network filesystem need to wait the previous operation (disable) to finish
if networkFS.Status.State != networkfsv1.NetworkFSStateDisabled {
logrus.Infof("Wait the previous operation (disable) to finish")
return nil, fmt.Errorf("wait the previous operation (disable) to finish on %v", networkFS.Name)
}
logrus.Infof("Endpoint %s is not ready, update lhVA to trigger export endpoint", networkFS.Name)
if err := c.updateLHVolumeAttachment(networkFS, true); err != nil {
return nil, err
Expand All @@ -114,12 +133,42 @@ func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) (
}
}

if lhShareMgr.Status.State != longhornv2.ShareManagerStateRunning {
logrus.Infof("Wait the share manager %s to be running", networkFS.Name)
return nil, fmt.Errorf("wait the share manager %s to be running", networkFS.Name)
}

// LH RWX volume endpoint should only have one address and one port
if len(endpoint.Subsets) > 1 || len(endpoint.Subsets[0].Addresses) > 1 || len(endpoint.Subsets[0].Ports) > 1 {
return nil, fmt.Errorf("endpoint %s has more than one subSets", networkFS.Name)
service, err := c.coreClient.Service().Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get service %s: %v", networkFS.Name, err)
return nil, err
}
if endpoint.Subsets[0].Ports[0].Name != "nfs" {
return nil, fmt.Errorf("endpoint %s has no nfs port", networkFS.Name)
networkFSCpy := networkFS.DeepCopy()
if service.Spec.ClusterIP != corev1.ClusterIPNone {
// means we depends on the service
if service.Spec.ClusterIP == "" {
// first init, service controller will update
logrus.Infof("Skip update on networkfs controller with the first time service init")
return nil, nil
}
networkFSCpy.Status.Endpoint = service.Spec.ClusterIP
} else {
endpoint, err := c.endpointsClient.Get(utils.LHNameSpace, networkFS.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Errorf("Failed to get endpoint %s: %v", networkFS.Name, err)
}
if len(endpoint.Subsets) == 0 {
logrus.Infof("Endpoint %s has no subsets (not ready), skip this round!", networkFS.Name)
return nil, nil
}
if len(endpoint.Subsets) > 1 || len(endpoint.Subsets[0].Addresses) > 1 || len(endpoint.Subsets[0].Ports) > 1 {
return nil, fmt.Errorf("endpoint %s has more than one subSets", networkFS.Name)
}
if endpoint.Subsets[0].Ports[0].Name != "nfs" {
return nil, fmt.Errorf("endpoint %s has no nfs port", networkFS.Name)
}
networkFSCpy.Status.Endpoint = endpoint.Subsets[0].Addresses[0].IP
}

pv, err := c.coreClient.PersistentVolume().Get(networkFS.Name, metav1.GetOptions{})
Expand All @@ -132,8 +181,6 @@ func (c *Controller) enableNetworkFS(networkFS *networkfsv1.NetworkFilesystem) (
opts = pv.Spec.CSI.VolumeAttributes["nfsOptions"]
}
// update network filesystem status
networkFSCpy := networkFS.DeepCopy()
networkFSCpy.Status.Endpoint = endpoint.Subsets[0].Addresses[0].IP
networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabled
networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS
networkFSCpy.Status.Status = networkfsv1.EndpointStatusReady
Expand Down
129 changes: 129 additions & 0 deletions pkg/controller/service/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package service

import (
"context"
"reflect"
"strings"

ctlcorev1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

networkfsv1 "github.com/harvester/networkfs-manager/pkg/apis/harvesterhci.io/v1beta1"
ctlntefsv1 "github.com/harvester/networkfs-manager/pkg/generated/controllers/harvesterhci.io/v1beta1"
"github.com/harvester/networkfs-manager/pkg/utils"
)

type Controller struct {
namespace string
nodeName string

ServiceCache ctlcorev1.ServiceCache
Services ctlcorev1.ServiceController
NetworkFSCache ctlntefsv1.NetworkFilesystemCache
NetworkFilsystems ctlntefsv1.NetworkFilesystemController
}

const (
netFSEndpointHandlerName = "harvester-netfs-service-handler"
)

// Register register the longhorn node CRD controller
func Register(ctx context.Context, services ctlcorev1.ServiceController, netfilesystems ctlntefsv1.NetworkFilesystemController, opt *utils.Option) error {

c := &Controller{
namespace: opt.Namespace,
nodeName: opt.NodeName,
Services: services,
ServiceCache: services.Cache(),
NetworkFilsystems: netfilesystems,
NetworkFSCache: netfilesystems.Cache(),
}

c.Services.OnChange(ctx, netFSEndpointHandlerName, c.OnServicesChange)
return nil
}

// OnServicesChange watch the services CR on change and sync up to networkfilesystem CR
func (c *Controller) OnServicesChange(_ string, service *corev1.Service) (*corev1.Service, error) {
if service == nil || service.DeletionTimestamp != nil {
logrus.Infof("Skip this round because service is deleted or deleting")
return nil, nil
}

// we only care about the endpoint with name prefix "pvc-"
if !strings.HasPrefix(service.Name, "pvc-") {
return nil, nil
}

logrus.Infof("Handling service %s change event", service.Name)
networkFS, err := c.NetworkFilsystems.Get(c.namespace, service.Name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get networkFS %s: %v", service.Name, err)
return nil, err
}

// only update when the networkfilesystem is enabled.
if networkFS.Spec.DesiredState != networkfsv1.NetworkFSStateEnabled {
logrus.Infof("Skip update with endpoint change event because networkfilesystem %s is not enabled", networkFS.Name)
return nil, nil
}

// means we depends on the endpoint, skip the update
if service.Spec.ClusterIP == corev1.ClusterIPNone {
return nil, nil
}
networkFSCpy := networkFS.DeepCopy()
// Not updated, skip the update (we will get a new service CR later)
if service.Spec.ClusterIP == "" {
networkFSCpy.Status.Endpoint = ""
networkFSCpy.Status.Status = networkfsv1.EndpointStatusNotReady
networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS
networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabling
conds := networkfsv1.NetworkFSCondition{
Type: networkfsv1.ConditionTypeNotReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Service is not ready",
Message: "Service did not contain the corresponding address",
}
networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds)
} else {
if networkFSCpy.Status.Endpoint != service.Spec.ClusterIP {
changedMsg := "Endpoint address is initialized with " + service.Spec.ClusterIP
if networkFSCpy.Status.Endpoint != "" {
changedMsg = "Endpoint address is changed, previous address is " + networkFSCpy.Status.Endpoint
}
conds := networkfsv1.NetworkFSCondition{
Type: networkfsv1.ConditionTypeEndpointChanged,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Endpoint is changed",
Message: changedMsg,
}
networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds)
}
networkFSCpy.Status.Endpoint = service.Spec.ClusterIP
networkFSCpy.Status.Status = networkfsv1.EndpointStatusReady
networkFSCpy.Status.Type = networkfsv1.NetworkFSTypeNFS
networkFSCpy.Status.State = networkfsv1.NetworkFSStateEnabling
conds := networkfsv1.NetworkFSCondition{
Type: networkfsv1.ConditionTypeReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Endpoint is ready",
Message: "Endpoint contains the corresponding address",
}
networkFSCpy.Status.NetworkFSConds = utils.UpdateNetworkFSConds(networkFSCpy.Status.NetworkFSConds, conds)
}

if !reflect.DeepEqual(networkFS, networkFSCpy) {
if _, err := c.NetworkFilsystems.UpdateStatus(networkFSCpy); err != nil {
logrus.Errorf("Failed to update networkFS %s: %v", networkFS.Name, err)
return nil, err
}
}

return nil, nil
}
6 changes: 5 additions & 1 deletion pkg/controller/sharemanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func (c *Controller) OnShareManagerChange(_ string, sharemanager *longhornv1.Sha

logrus.Infof("Handling sharemanager %s change event", sharemanager.Name)
networkFS, err := c.NetworkFilsystems.Get(c.namespace, sharemanager.Name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
if err != nil && !apierrors.IsNotFound(err) {
logrus.Errorf("Failed to get networkFS %s: %v", sharemanager.Name, err)
return nil, err
}
if apierrors.IsNotFound(err) {
logrus.Infof("Skip update with sharemanager change event because there is no corresponding NetworkFS %v", sharemanager.Name)
return nil, nil
}

// already disabled, return
if networkFS.Status.State == networkfsv1.NetworkFSStateDisabled {
Expand Down

0 comments on commit 59f23e8

Please sign in to comment.