Skip to content

Commit

Permalink
Merge pull request #912 from AntonAleksandrov13/main
Browse files Browse the repository at this point in the history
Allow to change MaxConcurrentReconciles via MAX_CONCURRENT_RECONCILES env variable
  • Loading branch information
Zerpet authored Jan 8, 2025
2 parents 8204c1b + a2df60e commit 62f01be
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
1 change: 1 addition & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ const (
EnableWebhooksEnvVar = "ENABLE_WEBHOOKS"
ControllerSyncPeriodEnvVar = "SYNC_PERIOD"
ConnectUsingPlainHTTPEnvVar = "CONNECT_USING_PLAIN_HTTP"
MaxConcurrentReconciles = "MAX_CONCURRENT_RECONCILES"
)
3 changes: 3 additions & 0 deletions controllers/super_stream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package controllers
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller"
"strconv"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -39,6 +40,7 @@ type SuperStreamReconciler struct {
Recorder record.EventRecorder
RabbitmqClientFactory rabbitmqclient.Factory
KubernetesClusterDomain string
MaxConcurrentReconciles int
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=exchanges,verbs=get;create;update;patch;delete
Expand Down Expand Up @@ -205,5 +207,6 @@ func (r *SuperStreamReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&topology.Exchange{}).
Owns(&topology.Binding{}).
Owns(&topology.Queue{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
Complete(r)
}
7 changes: 6 additions & 1 deletion controllers/topology_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"
Expand All @@ -34,6 +35,7 @@ type TopologyReconciler struct {
RabbitmqClientFactory rabbitmqclient.Factory
KubernetesClusterDomain string
ConnectUsingPlainHTTP bool
MaxConcurrentReconciles int
}

func (r *TopologyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -222,9 +224,12 @@ func (r *TopologyReconciler) SetupWithManager(mgr ctrl.Manager) error {
if len(r.WatchTypes) == 0 {
return ctrl.NewControllerManagedBy(mgr).
For(r.Type).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
Complete(r)
}
builder := ctrl.NewControllerManagedBy(mgr).For(r.Type)
builder := ctrl.NewControllerManagedBy(mgr).
For(r.Type).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles})
for _, t := range r.WatchTypes {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), t, ownerKey, addResourceToIndex); err != nil {
return err
Expand Down
41 changes: 36 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ func main() {
managerOpts.RetryPeriod = &retryPeriod
}

var maxConcurrentReconciles int
if maxConcurrentReconcilesEnvValue := getIntEnv(controllers.MaxConcurrentReconciles); maxConcurrentReconcilesEnvValue > 0 {
maxConcurrentReconciles = maxConcurrentReconcilesEnvValue
log.Info(fmt.Sprintf("maxConcurrentReconciles set to %d", maxConcurrentReconciles))
}

if enableDebugPprof, ok := os.LookupEnv("ENABLE_DEBUG_PPROF"); ok {
pprofEnabled, err := strconv.ParseBool(enableDebugPprof)
if err == nil && pprofEnabled {
Expand Down Expand Up @@ -187,6 +193,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.QueueReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.QueueControllerName)
os.Exit(1)
Expand All @@ -202,6 +209,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.ExchangeReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.ExchangeControllerName)
os.Exit(1)
Expand All @@ -217,6 +225,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.BindingReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.BindingControllerName)
os.Exit(1)
Expand All @@ -233,6 +242,7 @@ func main() {
WatchTypes: []client.Object{},
ReconcileFunc: &controllers.UserReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.UserControllerName)
os.Exit(1)
Expand All @@ -248,6 +258,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.VhostReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.VhostControllerName)
os.Exit(1)
Expand All @@ -263,6 +274,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.PolicyReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.PolicyControllerName)
os.Exit(1)
Expand All @@ -278,6 +290,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.OperatorPolicyReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.OperatorPolicyControllerName)
os.Exit(1)
Expand All @@ -293,6 +306,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.PermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.PermissionControllerName)
os.Exit(1)
Expand All @@ -308,6 +322,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.SchemaReplicationControllerName)
os.Exit(1)
Expand All @@ -323,6 +338,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.FederationReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.FederationControllerName)
os.Exit(1)
Expand All @@ -338,6 +354,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.ShovelReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.ShovelControllerName)
os.Exit(1)
Expand All @@ -353,17 +370,19 @@ func main() {
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.TopicPermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.TopicPermissionControllerName)
os.Exit(1)
}

if err = (&controllers.SuperStreamReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName(controllers.SuperStreamControllerName),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllers.SuperStreamControllerName),
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName(controllers.SuperStreamControllerName),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllers.SuperStreamControllerName),
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.SuperStreamControllerName)
os.Exit(1)
Expand Down Expand Up @@ -456,3 +475,15 @@ func getBoolEnv(envName string) bool {
}
return boolVar
}

func getIntEnv(envName string) int {
var intVar int
if initStr, ok := os.LookupEnv(envName); ok {
var err error
if intVar, err = strconv.Atoi(initStr); err != nil {
log.Error(err, fmt.Sprintf("unable to parse provided '%s'", envName))
os.Exit(1)
}
}
return intVar
}

0 comments on commit 62f01be

Please sign in to comment.