Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PVC syncing support #179

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/cmds/kubeconfig/kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rancher/k3k/cli/cmds"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller"
"github.com/rancher/k3k/pkg/controller/certs"
"github.com/rancher/k3k/pkg/controller/kubeconfig"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down Expand Up @@ -122,7 +123,7 @@ func generate(clx *cli.Context) error {
}
host := strings.Split(url.Host, ":")

certAltNames := kubeconfig.AddSANs(altNames)
certAltNames := certs.AddSANs(altNames)
if org == nil {
org = cli.StringSlice{user.SystemPrivilegedGroup}
}
Expand Down
125 changes: 125 additions & 0 deletions k3k-kubelet/controller/persistentvolumeclaims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package controller

import (
"context"

"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/log"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
pvcController = "pvc-syncer-controller"
pvcFinalizerName = "pv.k3k.io/finalizer"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be pvc.k3k.io/finalizer?

Suggested change
pvcFinalizerName = "pv.k3k.io/finalizer"
pvcFinalizerName = "pvc.k3k.io/finalizer"

)

type PVCReconciler struct {
virtualClient ctrlruntimeclient.Client
hostClient ctrlruntimeclient.Client
clusterName string
clusterNamespace string
Scheme *runtime.Scheme
HostScheme *runtime.Scheme
logger *log.Logger
Translater translate.ToHostTranslater
//objs sets.Set[types.NamespacedName]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop the comment?

Suggested change
//objs sets.Set[types.NamespacedName]

}

// AddPVCSyncer adds persistentvolumeclaims syncer controller to k3k-kubelet
func AddPVCSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, clusterName, clusterNamespace string, logger *log.Logger) error {
translater := translate.ToHostTranslater{
ClusterName: clusterName,
ClusterNamespace: clusterNamespace,
}
// initialize a new Reconciler
reconciler := PVCReconciler{
virtualClient: virtMgr.GetClient(),
hostClient: hostMgr.GetClient(),
Scheme: virtMgr.GetScheme(),
HostScheme: hostMgr.GetScheme(),
logger: logger.Named(pvcController),
Translater: translater,
clusterName: clusterName,
clusterNamespace: clusterNamespace,
}
return ctrl.NewControllerManagedBy(virtMgr).
For(&v1.PersistentVolumeClaim{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
}).
Complete(&reconciler)
}

func (v *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a nit but I had some issues to understand what v was. Maybe an r for Reconciler` is clearer?

log := v.logger.With("Cluster", v.clusterName, "PersistentVolumeClaim", req.NamespacedName)
var (
virtPVC v1.PersistentVolumeClaim
hostPVC v1.PersistentVolumeClaim
cluster v1alpha1.Cluster
)
if err := v.hostClient.Get(ctx, types.NamespacedName{Name: v.clusterName, Namespace: v.clusterNamespace}, &cluster); err != nil {
return reconcile.Result{}, err
}

// handling persistent volume sync
if err := v.virtualClient.Get(ctx, req.NamespacedName, &virtPVC); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}
syncedPVC := v.pvc(&virtPVC)
if err := controllerutil.SetControllerReference(&cluster, syncedPVC, v.HostScheme); err != nil {
return reconcile.Result{}, err
}
// handle deletion
if !virtPVC.DeletionTimestamp.IsZero() {
// deleting the synced service if exists
if err := v.hostClient.Delete(ctx, syncedPVC); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}
Comment on lines +84 to +87
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this could lead to some orphaned resource. I.e. what if the syncedPVC does not exist? This will error, and it will not be requed because of the IgnoreNotFound.

Maybe it's better to return only if the error is a different one. In case of ErrNotFound we should continue the deletion of the virtual PVC.

// remove the finalizer after cleaning up the synced service
if controllerutil.ContainsFinalizer(&virtPVC, pvcFinalizerName) {
controllerutil.RemoveFinalizer(&virtPVC, pvcFinalizerName)
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}
Comment on lines +89 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be shortened using only the RemoveFinalizer func. It does the same login of checking the finalizer, and it returns if the finalizer list was updated:

Suggested change
if controllerutil.ContainsFinalizer(&virtPVC, pvcFinalizerName) {
controllerutil.RemoveFinalizer(&virtPVC, pvcFinalizerName)
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}
if controllerutil.RemoveFinalizer(&virtPVC, pvcFinalizerName) {
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}

Probably the Contains is useful when you need to check the existence and do some logic before actually remove it.

return reconcile.Result{}, nil
}

// getting the cluster for setting the controller reference

// Add finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&virtPVC, pvcFinalizerName) {
controllerutil.AddFinalizer(&virtPVC, pvcFinalizerName)
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}
Comment on lines +100 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before:

Suggested change
// Add finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&virtPVC, pvcFinalizerName) {
controllerutil.AddFinalizer(&virtPVC, pvcFinalizerName)
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}
// Add finalizer if it does not exist
if controllerutil.AddFinalizer(&virtPVC, pvcFinalizerName) {
if err := v.virtualClient.Update(ctx, &virtPVC); err != nil {
return reconcile.Result{}, err
}
}

// create or update the pv on host
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// create or update the pv on host
// create or update the pvc on host

if err := v.hostClient.Get(ctx, types.NamespacedName{Name: syncedPVC.Name, Namespace: v.clusterNamespace}, &hostPVC); err != nil {
if apierrors.IsNotFound(err) {
log.Info("creating the persistent volume for the first time on the host cluster")
return reconcile.Result{}, v.hostClient.Create(ctx, syncedPVC)
}
return reconcile.Result{}, err
}
log.Info("updating pvc on the host cluster")
return reconcile.Result{}, v.hostClient.Update(ctx, syncedPVC)

}

func (v *PVCReconciler) pvc(obj *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
hostPVC := obj.DeepCopy()
v.Translater.TranslateTo(hostPVC)
// don't sync finalizers to the host
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean we are still missing to remove the finalizers here?

return hostPVC
}
120 changes: 120 additions & 0 deletions k3k-kubelet/controller/webhook/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package webhook

import (
"context"
"errors"
"fmt"

"github.com/rancher/k3k/pkg/controller/cluster/agent"
"github.com/rancher/k3k/pkg/log"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
webhookName = "nodename.podmutator.k3k.io"
webhookTimeout = int32(10)
webhookPort = "9443"
webhookPath = "/mutate--v1-pod"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the -- a convention?

)

type webhookHandler struct {
client ctrlruntimeclient.Client
scheme *runtime.Scheme
nodeName string
clusterName string
clusterNamespace string
logger *log.Logger
}

// AddPodMutatorWebhook will add a mutator webhook to the virtual cluster to
// modify the nodeName of the created pods with the name of the virtual kubelet node name
func AddPodMutatorWebhook(ctx context.Context, mgr manager.Manager, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName string, logger *log.Logger) error {
handler := webhookHandler{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
logger: logger,
clusterName: clusterName,
clusterNamespace: clusterNamespace,
nodeName: nodeName,
}

// create mutator webhook configuration to the cluster
config, err := handler.configuration(ctx, hostClient)
if err != nil {
return err
}
if err := handler.client.Create(ctx, config); err != nil {
return err
}
// register webhook with the manager
return ctrl.NewWebhookManagedBy(mgr).For(&v1.Pod{}).WithDefaulter(&handler).Complete()
}

func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error {
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("invalid request: object was type %t not cluster", obj)
}
w.logger.Infow("recieved request", "Pod", pod.Name, "Namespace", pod.Namespace)
if pod.Spec.NodeName == "" {
pod.Spec.NodeName = w.nodeName
}
return nil
}

func (w *webhookHandler) configuration(ctx context.Context, hostClient ctrlruntimeclient.Client) (*admissionregistrationv1.MutatingWebhookConfiguration, error) {
w.logger.Infow("extracting webhook tls from host cluster")
var (
webhookTLSSecret v1.Secret
)
if err := hostClient.Get(ctx, types.NamespacedName{Name: agent.WebhookSecretName(w.clusterName), Namespace: w.clusterNamespace}, &webhookTLSSecret); err != nil {
return nil, err
}
caBundle, ok := webhookTLSSecret.Data["ca.crt"]
if !ok {
return nil, errors.New("webhook CABundle does not exist in secret")
}
webhookURL := "https://" + w.nodeName + ":" + webhookPort + webhookPath
return &admissionregistrationv1.MutatingWebhookConfiguration{
TypeMeta: metav1.TypeMeta{
APIVersion: "admissionregistration.k8s.io/v1",
Kind: "MutatingWebhookConfiguration",
},
ObjectMeta: metav1.ObjectMeta{
Name: webhookName + "-configuration",
},
Webhooks: []admissionregistrationv1.MutatingWebhook{
{
Name: webhookName,
AdmissionReviewVersions: []string{"v1"},
SideEffects: ptr.To(admissionregistrationv1.SideEffectClassNone),
TimeoutSeconds: ptr.To(webhookTimeout),
ClientConfig: admissionregistrationv1.WebhookClientConfig{
URL: ptr.To(webhookURL),
CABundle: caBundle,
},
Rules: []admissionregistrationv1.RuleWithOperations{
{
Operations: []admissionregistrationv1.OperationType{
"CREATE",
},
Rule: admissionregistrationv1.Rule{
APIGroups: []string{""},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
Scope: ptr.To(admissionregistrationv1.NamespacedScope),
},
},
},
},
},
}, nil
}
23 changes: 19 additions & 4 deletions k3k-kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (

certutil "github.com/rancher/dynamiclistener/cert"
k3kkubeletcontroller "github.com/rancher/k3k/k3k-kubelet/controller"
k3kwebhook "github.com/rancher/k3k/k3k-kubelet/controller/webhook"
"github.com/rancher/k3k/k3k-kubelet/provider"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller"
"github.com/rancher/k3k/pkg/controller/certs"
"github.com/rancher/k3k/pkg/controller/cluster/server"
"github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap"
"github.com/rancher/k3k/pkg/controller/kubeconfig"
k3klog "github.com/rancher/k3k/pkg/log"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node"
Expand All @@ -38,6 +39,7 @@ import (
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

var (
Expand Down Expand Up @@ -111,21 +113,34 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
if err != nil {
return nil, errors.New("unable to add client go types to virtual cluster scheme: " + err.Error())
}
webhookServer := webhook.NewServer(webhook.Options{
CertDir: "/opt/rancher/k3k-webhook",
})
virtualMgr, err := ctrl.NewManager(virtConfig, manager.Options{
Scheme: virtualScheme,
Scheme: virtualScheme,
WebhookServer: webhookServer,
Metrics: ctrlserver.Options{
BindAddress: ":8084",
},
})
if err != nil {
return nil, errors.New("unable to create controller-runtime mgr for virtual cluster: " + err.Error())
}
logger.Info("adding pod mutator webhook")
if err := k3kwebhook.AddPodMutatorWebhook(ctx, virtualMgr, hostClient, c.ClusterName, c.ClusterNamespace, c.NodeName, logger); err != nil {
return nil, errors.New("unable to add pod mutator webhook for virtual cluster: " + err.Error())
}

logger.Info("adding service syncer controller")
if err := k3kkubeletcontroller.AddServiceSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil {
return nil, errors.New("failed to add service syncer controller: " + err.Error())
}

logger.Info("adding pvc syncer controller")
if err := k3kkubeletcontroller.AddPVCSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil {
return nil, errors.New("failed to add pvc syncer controller: " + err.Error())
}

clusterIP, err := clusterIP(ctx, c.AgentHostname, c.ClusterNamespace, hostClient)
if err != nil {
return nil, errors.New("failed to extract the clusterIP for the server service: " + err.Error())
Expand Down Expand Up @@ -270,7 +285,7 @@ func virtRestConfig(ctx context.Context, virtualConfigPath string, hostClient ct
}); err != nil {
return nil, errors.New("unable to decode bootstrap: " + err.Error())
}
adminCert, adminKey, err := kubeconfig.CreateClientCertKey(
adminCert, adminKey, err := certs.CreateClientCertKey(
controller.AdminCommonName, []string{user.SystemPrivilegedGroup},
nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, time.Hour*24*time.Duration(356),
b.ClientCA.Content,
Expand Down Expand Up @@ -333,7 +348,7 @@ func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clu
DNSNames: []string{hostname},
IPs: []net.IP{ip},
}
cert, key, err := kubeconfig.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content)
cert, key, err := certs.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content)
if err != nil {
return nil, errors.New("unable to get cert and key: " + err.Error())
}
Expand Down
2 changes: 2 additions & 0 deletions k3k-kubelet/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ func (p *Provider) transformVolumes(ctx context.Context, podNamespace string, vo
}
}
}
} else if volume.PersistentVolumeClaim != nil {
volume.PersistentVolumeClaim.ClaimName = p.Translater.TranslateName(podNamespace, volume.PersistentVolumeClaim.ClaimName)
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kubeconfig
package certs

import (
"crypto"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cluster/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ const (

type Agent interface {
Name() string
Config() (ctrlruntimeclient.Object, error)
Resources() []ctrlruntimeclient.Object
Config() ctrlruntimeclient.Object
Resources() ([]ctrlruntimeclient.Object, error)
}

func New(cluster *v1alpha1.Cluster, serviceIP, sharedAgentImage, token string) Agent {
Expand Down
Loading
Loading