Skip to content

Commit

Permalink
multi-replicas support
Browse files Browse the repository at this point in the history
  • Loading branch information
apetruhin committed Dec 12, 2024
1 parent b63f911 commit ad34044
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 95 deletions.
10 changes: 8 additions & 2 deletions api/v1/coroot_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type ExternalClickhouseSpec struct {
Database string `json:"database,omitempty"`
}

type PostgresSpec struct {
ConnectionString string `json:"connectionString,omitempty"`
}

type CorootSpec struct {
ApiKey string `json:"apiKey,omitempty"`
MetricsRefreshInterval metav1.Duration `json:"metricsRefreshInterval,omitempty"`
Expand All @@ -97,8 +101,8 @@ type CorootSpec struct {
EnterpriseEdition *EnterpriseEditionSpec `json:"enterpriseEdition,omitempty"`
AgentsOnly *AgentsOnlySpec `json:"agentsOnly,omitempty"`

Service ServiceSpec `json:"service,omitempty"`

Replicas int `json:"replicas,omitempty"`
Service ServiceSpec `json:"service,omitempty"`
Affinity *corev1.Affinity `json:"affinity,omitempty"`
Storage StorageSpec `json:"storage,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Expand All @@ -110,6 +114,8 @@ type CorootSpec struct {

Clickhouse ClickhouseSpec `json:"clickhouse,omitempty"`
ExternalClickhouse *ExternalClickhouseSpec `json:"externalClickhouse,omitempty"`

Postgres *PostgresSpec `json:"postgres,omitempty"`
}

type CorootStatus struct { // TODO
Expand Down
20 changes: 20 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/crd/coroot.com_coroots.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5405,6 +5405,11 @@ spec:
version:
type: string
type: object
postgres:
properties:
connectionString:
type: string
type: object
prometheus:
properties:
affinity:
Expand Down Expand Up @@ -6406,6 +6411,8 @@ spec:
x-kubernetes-int-or-string: true
type: object
type: object
replicas:
type: integer
resources:
description: ResourceRequirements describes the compute resource requirements.
properties:
Expand Down
1 change: 1 addition & 0 deletions config/samples/coroot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spec:
# licenseKey:
# agentsOnly:
# corootUrl:
# replicas: 1
nodeAgent:
clusterAgent:
prometheus:
Expand Down
4 changes: 0 additions & 4 deletions controller/clickhouse_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ func (r *CorootReconciler) clickhouseKeeperStatefulSet(cr *corootv1.Coroot) *app
}

replicas := int32(ClickhouseKeeperReplicas)
storageSize := cr.Spec.Clickhouse.Keeper.Storage.Size
if storageSize.IsZero() {
storageSize, _ = resource.ParseQuantity("10Gi")
}
ss.Spec = appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: ls,
Expand Down
39 changes: 26 additions & 13 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sync"
"time"
)
Expand All @@ -34,6 +33,8 @@ type CorootReconciler struct {

versions map[App]string
versionsLock sync.Mutex

deploymentDeleted bool
}

func NewCorootReconciler(mgr ctrl.Manager) *CorootReconciler {
Expand Down Expand Up @@ -75,21 +76,23 @@ func NewCorootReconciler(mgr ctrl.Manager) *CorootReconciler {
// +kubebuilder:rbac:groups=security.openshift.io,resources=securitycontextconstraints,verbs=use

func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger := ctrl.Log.WithValues("namespace", req.Namespace, "name", req.Name)

cr := &corootv1.Coroot{}
err := r.Get(ctx, req.NamespacedName, cr)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("Coroot has been deleted")
r.instancesLock.Lock()
delete(r.instances, req)
if r.instances[req] {
logger.Info("Coroot has been deleted")
delete(r.instances, req)
cr = &corootv1.Coroot{}
cr.Name = req.Name
cr.Namespace = req.Namespace
_ = r.Delete(ctx, r.clusterAgentClusterRoleBinding(cr))
_ = r.Delete(ctx, r.clusterAgentClusterRole(cr))
}
r.instancesLock.Unlock()
cr = &corootv1.Coroot{}
cr.Name = req.Name
cr.Namespace = req.Namespace
_ = r.Delete(ctx, r.clusterAgentClusterRoleBinding(cr))
_ = r.Delete(ctx, r.clusterAgentClusterRole(cr))
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
Expand All @@ -114,10 +117,20 @@ func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}

if cr.Spec.Replicas > 1 && (cr.Spec.Postgres == nil || cr.Spec.Postgres.ConnectionString == "") {
logger.Error(fmt.Errorf("Postgres.ConnectionString is empty"), "Coroot requires Postgres to run multiple replicas (will run only one replica)")
cr.Spec.Replicas = 1
}
r.CreateOrUpdateServiceAccount(ctx, cr, "coroot", sccNonroot)
r.CreateOrUpdatePVC(ctx, cr, r.corootPVC(cr))
r.CreateOrUpdateDeployment(ctx, cr, r.corootDeployment(cr))
for _, pvc := range r.corootPVCs(cr) {
r.CreateOrUpdatePVC(ctx, cr, pvc)
}
r.CreateOrUpdateStatefulSet(ctx, cr, r.corootStatefulSet(cr))
r.CreateOrUpdateService(ctx, cr, r.corootService(cr))
if !r.deploymentDeleted {
_ = r.Delete(ctx, r.corootDeployment(cr))
r.deploymentDeleted = true
}

r.CreateOrUpdateServiceAccount(ctx, cr, "prometheus", sccNonroot)
r.CreateOrUpdatePVC(ctx, cr, r.prometheusPVC(cr))
Expand All @@ -128,11 +141,11 @@ func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
r.CreateSecret(ctx, cr, r.clickhouseSecret(cr))

r.CreateOrUpdateServiceAccount(ctx, cr, "clickhouse-keeper", sccNonroot)
r.CreateOrUpdateService(ctx, cr, r.clickhouseKeeperServiceHeadless(cr))
for _, pvc := range r.clickhouseKeeperPVCs(cr) {
r.CreateOrUpdatePVC(ctx, cr, pvc)
}
r.CreateOrUpdateStatefulSet(ctx, cr, r.clickhouseKeeperStatefulSet(cr))
r.CreateOrUpdateService(ctx, cr, r.clickhouseKeeperServiceHeadless(cr))

r.CreateOrUpdateServiceAccount(ctx, cr, "clickhouse", sccNonroot)
r.CreateOrUpdateService(ctx, cr, r.clickhouseServiceHeadless(cr))
Expand All @@ -151,7 +164,7 @@ func (r *CorootReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

func (r *CorootReconciler) CreateOrUpdate(ctx context.Context, cr *corootv1.Coroot, obj client.Object, f controllerutil.MutateFn) {
logger := log.FromContext(nil, "type", fmt.Sprintf("%T", obj), "name", obj.GetName(), "namespace", obj.GetNamespace())
logger := ctrl.Log.WithValues("type", fmt.Sprintf("%T", obj), "name", obj.GetName(), "namespace", obj.GetNamespace())
_ = ctrl.SetControllerReference(cr, obj, r.Scheme)
errMsg := "failed to create or update"
if f == nil {
Expand Down
107 changes: 65 additions & 42 deletions controller/coroot.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,41 @@ func (r *CorootReconciler) corootService(cr *corootv1.Coroot) *corev1.Service {
return s
}

func (r *CorootReconciler) corootPVCs(cr *corootv1.Coroot) []*corev1.PersistentVolumeClaim {
ls := Labels(cr, "coroot")

size := cr.Spec.Storage.Size
if size.IsZero() {
size, _ = resource.ParseQuantity("10Gi")
}
replicas := cr.Spec.Replicas
if replicas == 0 {
replicas = 1
}

var res []*corev1.PersistentVolumeClaim
for replica := 0; replica < replicas; replica++ {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("data-%s-coroot-%d", cr.Name, replica),
Namespace: cr.Namespace,
Labels: ls,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
StorageClassName: cr.Spec.Storage.ClassName,
},
}
res = append(res, pvc)
}
return res
}

func (r *CorootReconciler) corootDeployment(cr *corootv1.Coroot) *appsv1.Deployment {
ls := Labels(cr, "coroot")
d := &appsv1.Deployment{
Expand All @@ -52,6 +87,18 @@ func (r *CorootReconciler) corootDeployment(cr *corootv1.Coroot) *appsv1.Deploym
Labels: ls,
},
}
return d
}

func (r *CorootReconciler) corootStatefulSet(cr *corootv1.Coroot) *appsv1.StatefulSet {
ls := Labels(cr, "coroot")
ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name + "-coroot",
Namespace: cr.Namespace,
Labels: ls,
},
}

refreshInterval := cr.Spec.MetricsRefreshInterval.Duration.String()
if cr.Spec.MetricsRefreshInterval.Duration == 0 {
Expand All @@ -61,7 +108,6 @@ func (r *CorootReconciler) corootDeployment(cr *corootv1.Coroot) *appsv1.Deploym
env := []corev1.EnvVar{
{Name: "GLOBAL_REFRESH_INTERVAL", Value: refreshInterval},
{Name: "GLOBAL_PROMETHEUS_URL", Value: fmt.Sprintf("http://%s-prometheus.%s:9090", cr.Name, cr.Namespace)},
{Name: "DO_NOT_CHECK_FOR_UPDATES", Value: "1"},
{Name: "INSTALLATION_TYPE", Value: "k8s-operator"},
}
if cr.Spec.CacheTTL.Duration > 0 {
Expand Down Expand Up @@ -111,13 +157,26 @@ func (r *CorootReconciler) corootDeployment(cr *corootv1.Coroot) *appsv1.Deploym
)
}

d.Spec = appsv1.DeploymentSpec{
if cr.Spec.Postgres != nil && cr.Spec.Postgres.ConnectionString != "" {
env = append(env, corev1.EnvVar{Name: "PG_CONNECTION_STRING", Value: cr.Spec.Postgres.ConnectionString})
}

replicas := int32(cr.Spec.Replicas)
if replicas <= 0 {
replicas = 1
}

ss.Spec = appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
},
Replicas: &replicas,
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
Namespace: cr.Namespace,
},
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: ls,
Expand Down Expand Up @@ -149,45 +208,9 @@ func (r *CorootReconciler) corootDeployment(cr *corootv1.Coroot) *appsv1.Deploym
},
},
},
Volumes: []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "data-" + cr.Name + "-coroot",
},
},
},
},
},
},
}

return d
}

func (r *CorootReconciler) corootPVC(cr *corootv1.Coroot) *corev1.PersistentVolumeClaim {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data-" + cr.Name + "-coroot",
Namespace: cr.Namespace,
Labels: Labels(cr, "coroot"),
},
}

size := cr.Spec.Storage.Size
if size.IsZero() {
size, _ = resource.ParseQuantity("10Gi")
}
pvc.Spec = corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
StorageClassName: cr.Spec.Storage.ClassName,
}

return pvc
return ss
}
18 changes: 17 additions & 1 deletion controller/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
corootv1 "github.io/coroot/operator/api/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -35,6 +36,21 @@ func (r *CorootReconciler) nodeAgentDaemonSet(cr *corootv1.Coroot) *appsv1.Daemo
for _, e := range cr.Spec.NodeAgent.Env {
env = append(env, e)
}

resources := cr.Spec.NodeAgent.Resources
if resources.Requests == nil {
resources.Requests = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
}
}
if resources.Limits == nil {
resources.Limits = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
}
}

ds.Spec = appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: ls,
Expand All @@ -59,7 +75,7 @@ func (r *CorootReconciler) nodeAgentDaemonSet(cr *corootv1.Coroot) *appsv1.Daemo
},
SecurityContext: &corev1.SecurityContext{Privileged: ptr.To(true)},
Env: env,
Resources: cr.Spec.NodeAgent.Resources,
Resources: resources,
VolumeMounts: []corev1.VolumeMount{
{Name: "cgroupfs", MountPath: "/host/sys/fs/cgroup", ReadOnly: true},
{Name: "tracefs", MountPath: "/sys/kernel/tracing"},
Expand Down
Loading

0 comments on commit ad34044

Please sign in to comment.