Skip to content

Commit

Permalink
controller: manually manage PVCs for StatefulSets
Browse files Browse the repository at this point in the history
  • Loading branch information
apetruhin committed Oct 3, 2024
1 parent eeb30ff commit cffefa1
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 31 deletions.
61 changes: 48 additions & 13 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,46 @@ func (r *CorootReconciler) clickhouseServiceHeadless(cr *corootv1.Coroot) *corev
return s
}

func (r *CorootReconciler) clickhousePVCs(cr *corootv1.Coroot) []*corev1.PersistentVolumeClaim {
ls := Labels(cr, "clickhouse")
shards := cr.Spec.Clickhouse.Shards
if shards == 0 {
shards = 1
}
replicas := cr.Spec.Clickhouse.Replicas
if replicas == 0 {
replicas = 1
}
size := cr.Spec.Clickhouse.Storage.Size
if size.IsZero() {
size, _ = resource.ParseQuantity("100Gi")
}

var res []*corev1.PersistentVolumeClaim
for shard := 0; shard < shards; shard++ {
for replica := 0; replica < replicas; replica++ {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("data-%s-clickhouse-shard-%d-%d", cr.Name, shard, replica),
Namespace: cr.Namespace,
Labels: ls,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
StorageClassName: cr.Spec.Storage.ClassName,
},
}
res = append(res, pvc)
}
}
return res
}

func (r *CorootReconciler) clickhouseStatefulSets(cr *corootv1.Coroot) []*appsv1.StatefulSet {
ls := Labels(cr, "clickhouse")

Expand All @@ -107,10 +147,6 @@ func (r *CorootReconciler) clickhouseStatefulSets(cr *corootv1.Coroot) []*appsv1
if replicas == 0 {
replicas = 1
}
storageSize := cr.Spec.Clickhouse.Storage.Size
if storageSize.IsZero() {
storageSize, _ = resource.ParseQuantity("100Gi")
}

var res []*appsv1.StatefulSet
for shard := 0; shard < shards; shard++ {
Expand All @@ -134,15 +170,6 @@ func (r *CorootReconciler) clickhouseStatefulSets(cr *corootv1.Coroot) []*appsv1
Name: "data",
Namespace: cr.Namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: storageSize,
},
},
StorageClassName: cr.Spec.Clickhouse.Storage.ClassName,
},
},
},
Template: corev1.PodTemplateSpec{
Expand Down Expand Up @@ -209,6 +236,14 @@ func (r *CorootReconciler) clickhouseStatefulSets(cr *corootv1.Coroot) []*appsv1
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
//{
// Name: "data",
// VolumeSource: corev1.VolumeSource{
// PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
// ClaimName: fmt.Sprintf("data-%s-clickhouse-shard-%d", cr.Name, shard),
// },
// },
//},
},
},
},
Expand Down
26 changes: 17 additions & 9 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
corootv1 "github.io/coroot/operator/api/v1"
"golang.org/x/exp/maps"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -48,10 +49,11 @@ func NewCorootReconciler(mgr ctrl.Manager) *CorootReconciler {
for range time.Tick(AppVersionsUpdateInterval) {
r.fetchAppVersions()
r.instancesLock.Lock()
for i := range r.instances {
instances := maps.Keys(r.instances)
r.instancesLock.Unlock()
for _, i := range instances {
_, _ = r.Reconcile(nil, i)
}
r.instancesLock.Unlock()
}
}()

Expand Down Expand Up @@ -94,8 +96,6 @@ func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
r.instances[req] = true
r.instancesLock.Unlock()

logger.Info(fmt.Sprintf("%+v", cr.Spec))

r.CreateOrUpdateDaemonSet(ctx, cr, r.nodeAgentDaemonSet(cr))

r.CreateOrUpdateServiceAccount(ctx, cr, r.clusterAgentServiceAccount(cr))
Expand All @@ -119,10 +119,16 @@ func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if cr.Spec.ExternalClickhouse == nil {
r.CreateSecret(ctx, cr, r.clickhouseSecret(cr))

for _, pvc := range r.clickhouseKeeperPVCs(cr) {
r.CreateOrUpdatePVC(ctx, cr, pvc)
}
r.CreateOrUpdateStatefulSet(ctx, cr, r.clickhouseKeeperStatefulSet(cr))
r.CreateOrUpdateService(ctx, cr, r.clickhouseKeeperServiceHeadless(cr))

r.CreateOrUpdateService(ctx, cr, r.clickhouseServiceHeadless(cr))
for _, pvc := range r.clickhousePVCs(cr) {
r.CreateOrUpdatePVC(ctx, cr, pvc)
}
for _, clickhouse := range r.clickhouseStatefulSets(cr) {
r.CreateOrUpdateStatefulSet(ctx, cr, clickhouse)
}
Expand Down Expand Up @@ -171,12 +177,12 @@ func (r *CorootReconciler) CreateOrUpdateDaemonSet(ctx context.Context, cr *coro
}

func (r *CorootReconciler) CreateOrUpdateStatefulSet(ctx context.Context, cr *corootv1.Coroot, ss *appsv1.StatefulSet) {
for i := range ss.Spec.VolumeClaimTemplates {
_ = ctrl.SetControllerReference(cr, &ss.Spec.VolumeClaimTemplates[i], r.Scheme)
}
spec := ss.Spec
r.CreateOrUpdate(ctx, cr, ss, func() error {
return Merge(&ss.Spec, spec)
volumeClaimTemplates := ss.Spec.VolumeClaimTemplates[:]
err := Merge(&ss.Spec, spec)
ss.Spec.VolumeClaimTemplates = volumeClaimTemplates
return err
})
}

Expand All @@ -190,7 +196,9 @@ func (r *CorootReconciler) CreateOrUpdatePVC(ctx context.Context, cr *corootv1.C
func (r *CorootReconciler) CreateOrUpdateService(ctx context.Context, cr *corootv1.Coroot, s *corev1.Service) {
spec := s.Spec
r.CreateOrUpdate(ctx, cr, s, func() error {
return Merge(&s.Spec, spec)
err := Merge(&s.Spec, spec)
s.Spec.Ports = spec.Ports
return err
})
}

Expand Down
49 changes: 40 additions & 9 deletions controller/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ func (r *CorootReconciler) clickhouseKeeperServiceHeadless(cr *corootv1.Coroot)
return s
}

func (r *CorootReconciler) clickhouseKeeperPVCs(cr *corootv1.Coroot) []*corev1.PersistentVolumeClaim {
ls := Labels(cr, "clickhouse-keeper")
size := cr.Spec.Clickhouse.Keeper.Storage.Size
if size.IsZero() {
size, _ = resource.ParseQuantity("10Gi")
}

var res []*corev1.PersistentVolumeClaim
for replica := 0; replica < ClickhouseKeeperReplicas; replica++ {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("data-%s-clickhouse-keeper-%d", cr.Name, replica),
Namespace: cr.Namespace,
Labels: ls,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
StorageClassName: cr.Spec.Storage.ClassName,
},
}
res = append(res, pvc)
}
return res
}

func (r *CorootReconciler) clickhouseKeeperStatefulSet(cr *corootv1.Coroot) *appsv1.StatefulSet {
ls := Labels(cr, "clickhouse-keeper")
ss := &appsv1.StatefulSet{
Expand All @@ -72,15 +102,16 @@ func (r *CorootReconciler) clickhouseKeeperStatefulSet(cr *corootv1.Coroot) *app
Name: "data",
Namespace: cr.Namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: storageSize,
},
},
StorageClassName: cr.Spec.Clickhouse.Keeper.Storage.ClassName,
},
//Spec: corev1.PersistentVolumeClaimSpec{
// VolumeMode: ptr.To(corev1.PersistentVolumeFilesystem),
// AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
// Resources: corev1.VolumeResourceRequirements{
// Requests: corev1.ResourceList{
// corev1.ResourceStorage: storageSize,
// },
// },
// StorageClassName: cr.Spec.Clickhouse.Keeper.Storage.ClassName,
//},
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand Down
1 change: 1 addition & 0 deletions controller/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (r *CorootReconciler) fetchAppVersions() {
}
versions[app] = v
}
logger.Info(fmt.Sprintf("got app versions: %v", versions))
r.versionsLock.Lock()
defer r.versionsLock.Unlock()
for app, v := range versions {
Expand Down

0 comments on commit cffefa1

Please sign in to comment.