Skip to content

Commit

Permalink
Merge pull request #211 from gchaviaras-NS1/multi-target-support-rebased
Browse files Browse the repository at this point in the history
Multi-target support v2
  • Loading branch information
k8s-ci-robot authored Dec 24, 2024
2 parents 9c84f0c + d499f0d commit 40a036a
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 53 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Usage of cluster-proportional-autoscaler:
--namespace="": Namespace for all operations, fallback to the namespace of this autoscaler(through MY_POD_NAMESPACE env) if not specified.
--poll-period-seconds=10: The time, in seconds, to check cluster status and perform autoscale.
--stderrthreshold=2: logs at or above this threshold go to stderr
--target="": Target to scale. In format: deployment/*, replicationcontroller/* or replicaset/* (not case sensitive).
--target="": Targets to scale. In format: 'deployment/*,replicationcontroller/*,replicaset/*' (not case sensitive, comma delimiter supported).
--v=0: log level for V logs
--version[=false]: Print the version and exit.
--vmodule=: comma-separated list of pattern=N settings for file-filtered logging
Expand Down Expand Up @@ -173,6 +173,23 @@ data:
}
```
## Multi-target support
This container provides the configuration parameters for defining the `target` on which the cluster-proportional-autoscaler
will apply the corresponding scaling control pattern.
The supported k8s workloads for scaling are `deployment`, `replicationcontroller` and `replicaset`.
A single cluster-proportional-autoscale is capable of handling multiple targets by setting the `--target` param.
***Note:*** the k8s workloads should be under the namespace configured from the `--namespace` flag.
```
...
--target="deployment/first,deployment/second"
...
```
## Comparisons to the Horizontal Pod Autoscaler feature
The [Horizontal Pod Autoscaler](http://kubernetes.io/docs/user-guide/horizontal-pod-autoscaling/) is a top-level Kubernetes API resource. It is a closed feedback loop autoscaler which monitors CPU utilization of the pods and scales the number of replicas automatically. It requires the CPU resources to be defined for all containers in the target pods and also requires heapster to be running to provide CPU utilization metrics.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{{ fail "You must supply a config for either ladder mode or linear mode but not both" }}
{{- end }}
{{ $key := mustFirst (keys $config) }}
---
kind: ConfigMap
apiVersion: v1
metadata:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
{{- $target := default "" .Values.options.target }}
{{- if and (and (not (hasPrefix "deployment/" $target)) (not (hasPrefix "replicationcontroller/" $target))) (not (hasPrefix "replicaset/" $target)) }}
{{ fail "options.target must be one of deployment, replicationcontroller, or replicaset" }}
{{- end }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -40,7 +37,7 @@ spec:
- --configmap={{ include "cluster-proportional-autoscaler.fullname" . }}
- --logtostderr={{ ternary true false (not (empty .Values.options.logToStdErr)) }}
- --namespace={{ default .Release.Namespace .Values.options.namespace }}
- --target={{- required "options.target must be specified" $target }}
- --target={{- required "options.target must be specified" .Values.options.target }}
- --v={{ .Values.options.logLevel | int }}
{{- with ternary true false (not (empty .Values.options.alsoLogToStdErr)) }}
- --alsologtostderr={{ . }}
Expand Down
1 change: 1 addition & 0 deletions charts/cluster-proportional-autoscaler/templates/role.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{{- if .Values.serviceAccount.create -}}
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down
18 changes: 12 additions & 6 deletions cmd/cluster-proportional-autoscaler/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ func isTargetFormatValid(target string) bool {
glog.Errorf("--target parameter cannot be empty")
return false
}
if !strings.HasPrefix(target, "deployment/") &&
!strings.HasPrefix(target, "replicationcontroller/") &&
!strings.HasPrefix(target, "replicaset/") {
glog.Errorf("Target format error. Please use deployment/*, replicationcontroller/* or replicaset/* (not case sensitive).")
return false

for _, target := range strings.Split(target, ",") {
target := strings.TrimSpace(target)

if !strings.HasPrefix(target, "deployment/") &&
!strings.HasPrefix(target, "replicationcontroller/") &&
!strings.HasPrefix(target, "replicaset/") {
glog.Errorf("Target format error. Please use 'deployment/*,replicationcontroller/*,replicaset/*' (not case sensitive, comma delimiter supported).")
return false
}
}

return true
}

Expand Down Expand Up @@ -117,7 +123,7 @@ func (c *configMapData) Type() string {

// AddFlags adds flags for a specific AutoScaler to the specified FlagSet
func (c *AutoScalerConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.Target, "target", c.Target, "Target to scale. In format: deployment/*, replicationcontroller/* or replicaset/* (not case sensitive).")
fs.StringVar(&c.Target, "target", c.Target, "Target to scale. In format: 'deployment/*,replicationcontroller/*,replicaset/*' (not case sensitive, comma delimiter supported).")
fs.StringVar(&c.ConfigMap, "configmap", c.ConfigMap, "ConfigMap containing our scaling parameters.")
fs.StringVar(&c.Namespace, "namespace", c.Namespace, "Namespace for all operations, fallback to the namespace of this autoscaler(through MY_POD_NAMESPACE env) if not specified.")
fs.IntVar(&c.PollPeriodSeconds, "poll-period-seconds", c.PollPeriodSeconds, "The time, in seconds, to check cluster status and perform autoscale.")
Expand Down
4 changes: 4 additions & 0 deletions cmd/cluster-proportional-autoscaler/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func TestIsTargetFormatValid(t *testing.T) {
"DeplOymEnT/anything",
true,
},
{
"DeplOymEnT/anything, replicaset/anything,replicationcontroller/anything",
true,
},
{
"deployments/anything",
false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/autoscaler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *AutoScaler) pollAPIServer() error {
glog.V(4).Infof("Expected replica count: %3d", expReplicas)

// Update resource target with expected replicas.
_, err = s.k8sClient.UpdateReplicas(expReplicas)
err = s.k8sClient.UpdateReplicas(expReplicas)
if err != nil {
glog.Errorf("Update failure: %s", err)
}
Expand Down
116 changes: 81 additions & 35 deletions pkg/autoscaler/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type K8sClient interface {
// GetNamespace returns the namespace of target resource.
GetNamespace() (namespace string)
// UpdateReplicas updates the number of replicas for the resource and return the previous replicas count
UpdateReplicas(expReplicas int32) (prevReplicas int32, err error)
UpdateReplicas(expReplicas int32) (err error)
}

// k8sClient - Wraps all Kubernetes API client functionalities
type k8sClient struct {
target *scaleTarget
scaleTargets *scaleTargets
clientset kubernetes.Interface
clusterStatus *ClusterStatus
nodeLister corelisters.NodeLister
Expand Down Expand Up @@ -100,38 +100,56 @@ func NewK8sClient(clientset kubernetes.Interface, namespace, target string, node
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)

scaleTarget, err := getScaleTarget(target, namespace)
scaleTargets, err := getScaleTargets(target, namespace)
if err != nil {
return nil, err
}

return &k8sClient{
target: scaleTarget,
clientset: clientset,
nodeLister: nodeLister,
stopCh: stopCh,
scaleTargets: scaleTargets,
clientset: clientset,
nodeLister: nodeLister,
stopCh: stopCh,
}, nil
}

func getScaleTarget(target, namespace string) (*scaleTarget, error) {
splits := strings.Split(target, "/")
func getScaleTargets(targets, namespace string) (*scaleTargets, error) {
st := &scaleTargets{targets: []target{}, namespace: namespace}

for _, el := range strings.Split(targets, ",") {
el := strings.TrimSpace(el)
target, err := getTarget(el)
if err != nil {
return &scaleTargets{}, fmt.Errorf("target format error: %v", targets)
}
st.targets = append(st.targets, target)
}
return st, nil
}

func getTarget(t string) (target, error) {
splits := strings.Split(t, "/")
if len(splits) != 2 {
return &scaleTarget{}, fmt.Errorf("target format error: %v", target)
return target{}, fmt.Errorf("target format error: %v", t)
}
kind := splits[0]
name := splits[1]
return &scaleTarget{kind, name, namespace}, nil
return target{kind, name}, nil
}

// scaleTarget stores the scalable target recourse
type scaleTarget struct {
kind string
name string
type target struct {
kind string
name string
}

// scaleTargets stores the scalable target resources
type scaleTargets struct {
targets []target
namespace string
}

func (k *k8sClient) GetNamespace() (namespace string) {
return k.target.namespace
return k.scaleTargets.namespace
}

func (k *k8sClient) FetchConfigMap(namespace, configmap string) (*v1.ConfigMap, error) {
Expand Down Expand Up @@ -200,56 +218,75 @@ func (k *k8sClient) GetClusterStatus() (clusterStatus *ClusterStatus, err error)
return clusterStatus, nil
}

func (k *k8sClient) UpdateReplicas(expReplicas int32) (prevReplicas int32, err error) {
prevReplicas, err = k.updateReplicasAppsV1(expReplicas)
func (k *k8sClient) UpdateReplicas(expReplicas int32) (err error) {
for _, target := range k.scaleTargets.targets {
_, err := k.UpdateTargetReplicas(expReplicas, target)
if err != nil {
return err
}
}
return nil
}

func (k *k8sClient) UpdateTargetReplicas(expReplicas int32, target target) (prevReplicas int32, err error) {
prevReplicas, err = k.updateReplicasAppsV1(expReplicas, target)
if err == nil || !apierrors.IsForbidden(err) {
return prevReplicas, err
}
glog.V(1).Infof("Falling back to extensions/v1beta1, error using apps/v1: %v", err)

// Fall back to using the extensions API if we get a forbidden error
scale, err := k.getScaleExtensionsV1beta1(k.target)
scale, err := k.getScaleExtensionsV1beta1(&target)
if err != nil {
return 0, err
}
prevReplicas = scale.Spec.Replicas
if expReplicas != prevReplicas {
glog.V(0).Infof("Cluster status: SchedulableNodes[%v], TotalNodes[%v], SchedulableCores[%v], TotalCores[%v]", k.clusterStatus.SchedulableNodes, k.clusterStatus.TotalNodes, k.clusterStatus.SchedulableCores, k.clusterStatus.TotalCores)
glog.V(0).Infof("Replicas are not as expected : updating replicas from %d to %d", prevReplicas, expReplicas)
glog.V(0).Infof(
"Cluster status: SchedulableNodes[%v], TotalNodes[%v], SchedulableCores[%v], TotalCores[%v]",
k.clusterStatus.SchedulableNodes,
k.clusterStatus.TotalNodes,
k.clusterStatus.SchedulableCores,
k.clusterStatus.TotalCores)
glog.V(0).Infof("Replicas are not as expected : updating %s/%s from %d to %d",
target.kind,
target.name,
prevReplicas,
expReplicas)
scale.Spec.Replicas = expReplicas
_, err = k.updateScaleExtensionsV1beta1(k.target, scale)
_, err = k.updateScaleExtensionsV1beta1(&target, scale)
if err != nil {
return 0, err
}
}
return prevReplicas, nil
}

func (k *k8sClient) getScaleExtensionsV1beta1(target *scaleTarget) (*extensionsv1beta1.Scale, error) {
func (k *k8sClient) getScaleExtensionsV1beta1(target *target) (*extensionsv1beta1.Scale, error) {
opt := metav1.GetOptions{}
switch strings.ToLower(target.kind) {
case "deployment", "deployments":
return k.clientset.ExtensionsV1beta1().Deployments(target.namespace).GetScale(context.TODO(), target.name, opt)
return k.clientset.ExtensionsV1beta1().Deployments(k.scaleTargets.namespace).GetScale(context.TODO(), target.name, opt)
case "replicaset", "replicasets":
return k.clientset.ExtensionsV1beta1().ReplicaSets(target.namespace).GetScale(context.TODO(), target.name, opt)
return k.clientset.ExtensionsV1beta1().ReplicaSets(k.scaleTargets.namespace).GetScale(context.TODO(), target.name, opt)
default:
return nil, fmt.Errorf("unsupported target kind: %v", target.kind)
}
}

func (k *k8sClient) updateScaleExtensionsV1beta1(target *scaleTarget, scale *extensionsv1beta1.Scale) (*extensionsv1beta1.Scale, error) {
func (k *k8sClient) updateScaleExtensionsV1beta1(target *target, scale *extensionsv1beta1.Scale) (*extensionsv1beta1.Scale, error) {
switch strings.ToLower(target.kind) {
case "deployment", "deployments":
return k.clientset.ExtensionsV1beta1().Deployments(target.namespace).UpdateScale(context.TODO(), target.name, scale, metav1.UpdateOptions{})
return k.clientset.ExtensionsV1beta1().Deployments(k.scaleTargets.namespace).UpdateScale(context.TODO(), target.name, scale, metav1.UpdateOptions{})
case "replicaset", "replicasets":
return k.clientset.ExtensionsV1beta1().ReplicaSets(target.namespace).UpdateScale(context.TODO(), target.name, scale, metav1.UpdateOptions{})
return k.clientset.ExtensionsV1beta1().ReplicaSets(k.scaleTargets.namespace).UpdateScale(context.TODO(), target.name, scale, metav1.UpdateOptions{})
default:
return nil, fmt.Errorf("unsupported target kind: %v", target.kind)
}
}

func (k *k8sClient) updateReplicasAppsV1(expReplicas int32) (prevReplicas int32, err error) {
req, err := requestForTarget(k.clientset.AppsV1().RESTClient().Get(), k.target)
func (k *k8sClient) updateReplicasAppsV1(expReplicas int32, target target) (prevReplicas int32, err error) {
req, err := requestForTarget(k.clientset.AppsV1().RESTClient().Get(), &target, k.scaleTargets.namespace)
if err != nil {
return 0, err
}
Expand All @@ -261,10 +298,19 @@ func (k *k8sClient) updateReplicasAppsV1(expReplicas int32) (prevReplicas int32,

prevReplicas = scale.Spec.Replicas
if expReplicas != prevReplicas {
glog.V(0).Infof("Cluster status: SchedulableNodes[%v], TotalNodes[%v], SchedulableCores[%v], TotalCores[%v]", k.clusterStatus.SchedulableNodes, k.clusterStatus.TotalNodes, k.clusterStatus.SchedulableCores, k.clusterStatus.TotalCores)
glog.V(0).Infof("Replicas are not as expected : updating replicas from %d to %d", prevReplicas, expReplicas)
glog.V(0).Infof(
"Cluster status: SchedulableNodes[%v], TotalNodes[%v], SchedulableCores[%v], TotalCores[%v]",
k.clusterStatus.SchedulableNodes,
k.clusterStatus.TotalNodes,
k.clusterStatus.SchedulableCores,
k.clusterStatus.TotalCores)
glog.V(0).Infof("Replicas are not as expected : updating %s/%s from %d to %d",
target.kind,
target.name,
prevReplicas,
expReplicas)
scale.Spec.Replicas = expReplicas
req, err = requestForTarget(k.clientset.AppsV1().RESTClient().Put(), k.target)
req, err = requestForTarget(k.clientset.AppsV1().RESTClient().Put(), &target, k.scaleTargets.namespace)
if err != nil {
return 0, err
}
Expand All @@ -276,7 +322,7 @@ func (k *k8sClient) updateReplicasAppsV1(expReplicas int32) (prevReplicas int32,
return prevReplicas, nil
}

func requestForTarget(req *rest.Request, target *scaleTarget) (*rest.Request, error) {
func requestForTarget(req *rest.Request, target *target, namespace string) (*rest.Request, error) {
var absPath, resource string
// Support the kinds we allowed scaling via the extensions API group
// TODO: switch to use the polymorphic scale client once client-go versions are updated
Expand All @@ -297,5 +343,5 @@ func requestForTarget(req *rest.Request, target *scaleTarget) (*rest.Request, er
return nil, fmt.Errorf("unsupported target kind: %v", target.kind)
}

return req.AbsPath(absPath).Namespace(target.namespace).Resource(resource).Name(target.name).SubResource("scale"), nil
return req.AbsPath(absPath).Namespace(namespace).Resource(resource).Name(target.name).SubResource("scale"), nil
}
Loading

0 comments on commit 40a036a

Please sign in to comment.