Skip to content

Commit

Permalink
Separate metric storages (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
yalosev authored Oct 31, 2023
1 parent 9035c11 commit be3366a
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 108 deletions.
4 changes: 4 additions & 0 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (c *kubernetesBindingsController) UnlockEvents() {
// UnlockEventsFor turns on eventCb for matched monitor to emit events after Synchronization.
func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) {
m := c.kubeEventsManager.GetMonitor(monitorID)
if m == nil {
log.Warnf("monitor %q was not found", monitorID)
return
}
m.EnableKubeEventCb()
}

Expand Down
25 changes: 12 additions & 13 deletions pkg/shell-operator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func Init() (*ShellOperator, error) {
return nil, err
}

err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort)
err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort, map[string]string{
"hook": "",
"binding": "",
"queue": "",
})
if err != nil {
log.Errorf("Fatal: %s", err)
return nil, err
Expand All @@ -68,19 +72,14 @@ func Init() (*ShellOperator, error) {
// AssembleCommonOperator instantiate common dependencies. These dependencies
// may be used for shell-operator derivatives, like addon-operator.
// requires listenAddress, listenPort to run http server for operator APIs
func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string) (err error) {
func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string, kubeEventsManagerLabels map[string]string) (err error) {
op.APIServer = newBaseHTTPServer(listenAddress, listenPort)

op.MetricStorage = defaultMetricStorage(op.ctx)
// built-in metrics
op.setupMetricStorage(kubeEventsManagerLabels)

// metrics from user's hooks
op.setupHookMetricStorage()
if err != nil {
return fmt.Errorf("start HTTP server for hook metrics: %s", err)
}
// Set to common metric storage if separate port is not set.
if op.HookMetricStorage == nil {
op.HookMetricStorage = op.MetricStorage
}

// 'main' Kubernetes client.
op.KubeClient, err = initDefaultMainKubeClient(op.MetricStorage)
Expand Down Expand Up @@ -112,14 +111,14 @@ func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string
// - kubernetes events manager
// - schedule manager
func (op *ShellOperator) assembleShellOperator(hooksDir string, tempDir string, debugServer *debug.Server, runtimeConfig *config.Config) (err error) {
registerDefaultRoutes(op)
registerRootRoute(op)
// for shell-operator only
registerHookMetrics(op.HookMetricStorage)

op.RegisterDebugQueueRoutes(debugServer)
op.RegisterDebugHookRoutes(debugServer)
op.RegisterDebugConfigRoutes(debugServer, runtimeConfig)

registerShellOperatorMetrics(op.MetricStorage)

// Create webhookManagers with dependencies.
op.setupHookManagers(hooksDir, tempDir)

Expand Down
12 changes: 3 additions & 9 deletions pkg/shell-operator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (bhs *baseHTTPServer) Start(ctx context.Context) {
srv := &http.Server{
Addr: bhs.address + ":" + bhs.port,
Handler: bhs.router,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ReadTimeout: 90 * time.Second,
WriteTimeout: 90 * time.Second,
}

go func() {
Expand Down Expand Up @@ -109,7 +109,7 @@ func newBaseHTTPServer(address, port string) *baseHTTPServer {
return srv
}

func registerDefaultRoutes(op *ShellOperator) {
func registerRootRoute(op *ShellOperator) {
op.APIServer.RegisterRoute(http.MethodGet, "/", func(writer http.ResponseWriter, request *http.Request) {
_, _ = fmt.Fprintf(writer, `<html>
<head><title>Shell operator</title></head>
Expand All @@ -125,10 +125,4 @@ func registerDefaultRoutes(op *ShellOperator) {
</body>
</html>`, app.ListenPort)
})

op.APIServer.RegisterRoute(http.MethodGet, "/metrics", func(writer http.ResponseWriter, request *http.Request) {
if op.MetricStorage != nil {
op.MetricStorage.Handler().ServeHTTP(writer, request)
}
})
}
65 changes: 64 additions & 1 deletion pkg/shell-operator/metrics_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,68 @@ func (op *ShellOperator) setupHookMetricStorage() {
op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", metricStorage.Handler().ServeHTTP)
// create new metric storage for hooks
// register scrape handler
op.MetricStorage = metricStorage
op.HookMetricStorage = metricStorage
}

// specific metrics for shell-operator HookManager
func registerHookMetrics(metricStorage *metric_storage.MetricStorage) {
// Metrics for enable kubernetes bindings.
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_seconds", map[string]string{"hook": ""})
metricStorage.RegisterCounter("{PREFIX}hook_enable_kubernetes_bindings_errors_total", map[string]string{"hook": ""})
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_success", map[string]string{"hook": ""})

// Metrics for hook executions.
labels := map[string]string{
"hook": "",
"binding": "",
"queue": "",
}
// Duration of hook execution.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)

// System CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_user_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// User CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_sys_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// Max RSS in bytes.
metricStorage.RegisterGauge("{PREFIX}hook_run_max_rss_bytes", labels)

metricStorage.RegisterCounter("{PREFIX}hook_run_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_allowed_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_success_total", labels)
// hook_run task waiting time
metricStorage.RegisterCounter("{PREFIX}task_wait_in_queue_seconds_total", labels)
}
102 changes: 18 additions & 84 deletions pkg/shell-operator/metrics_operator.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
package shell_operator

import (
"context"
"net/http"

"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/metric_storage"
)

func defaultMetricStorage(ctx context.Context) *metric_storage.MetricStorage {
metricStorage := metric_storage.NewMetricStorage(ctx, app.PrometheusMetricsPrefix, false)
return metricStorage
}
// setupMetricStorage creates and initializes metrics storage for built-in operator metrics
func (op *ShellOperator) setupMetricStorage(kubeEventsManagerLabels map[string]string) {
metricStorage := metric_storage.NewMetricStorage(op.ctx, app.PrometheusMetricsPrefix, false)

registerCommonMetrics(metricStorage)
registerTaskQueueMetrics(metricStorage)
registerKubeEventsManagerMetrics(metricStorage, kubeEventsManagerLabels)

// registerShellOperatorMetrics register all metrics needed for the ShellOperator.
func registerShellOperatorMetrics(metricStorage *metric_storage.MetricStorage) {
RegisterCommonMetrics(metricStorage)
RegisterTaskQueueMetrics(metricStorage)
RegisterKubeEventsManagerMetrics(metricStorage, map[string]string{
"hook": "",
"binding": "",
"queue": "",
})
registerHookMetrics(metricStorage)
op.APIServer.RegisterRoute(http.MethodGet, "/metrics", metricStorage.Handler().ServeHTTP)
// create new metric storage for hooks
// register scrape handler
op.MetricStorage = metricStorage
}

// RegisterCommonMetrics register base metric
// registerCommonMetrics register base metric
// This function is used in the addon-operator
func RegisterCommonMetrics(metricStorage *metric_storage.MetricStorage) {
func registerCommonMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterCounter("{PREFIX}live_ticks", map[string]string{})
}

// RegisterTaskQueueMetrics
// registerTaskQueueMetrics
// This function is used in the addon-operator
func RegisterTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
func registerTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterHistogram(
"{PREFIX}tasks_queue_action_duration_seconds",
map[string]string{
Expand All @@ -51,9 +48,9 @@ func RegisterTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterGauge("{PREFIX}tasks_queue_length", map[string]string{"queue": ""})
}

// RegisterKubeEventsManagerMetrics registers metrics for kube_event_manager
// registerKubeEventsManagerMetrics registers metrics for kube_event_manager
// This function is used in the addon-operator
func RegisterKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorage, labels map[string]string) {
func registerKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorage, labels map[string]string) {
// Count of objects in snapshot for one kubernets bindings.
metricStorage.RegisterGauge("{PREFIX}kube_snapshot_objects", labels)
// Duration of jqFilter applying.
Expand Down Expand Up @@ -84,66 +81,3 @@ func RegisterKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorag
// Count of watch errors.
metricStorage.RegisterCounter("{PREFIX}kubernetes_client_watch_errors_total", map[string]string{"error_type": ""})
}

// Shell-operator specific metrics for HookManager
func registerHookMetrics(metricStorage *metric_storage.MetricStorage) {
// Metrics for enable kubernetes bindings.
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_seconds", map[string]string{"hook": ""})
metricStorage.RegisterCounter("{PREFIX}hook_enable_kubernetes_bindings_errors_total", map[string]string{"hook": ""})
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_success", map[string]string{"hook": ""})

// Metrics for hook executions.
labels := map[string]string{
"hook": "",
"binding": "",
"queue": "",
}
// Duration of hook execution.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)

// System CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_user_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// User CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_sys_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// Max RSS in bytes.
metricStorage.RegisterGauge("{PREFIX}hook_run_max_rss_bytes", labels)

metricStorage.RegisterCounter("{PREFIX}hook_run_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_allowed_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_success_total", labels)
// hook_run task waiting time
metricStorage.RegisterCounter("{PREFIX}task_wait_in_queue_seconds_total", labels)
}
3 changes: 2 additions & 1 deletion pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type ShellOperator struct {
// APIServer common http server for liveness and metrics endpoints
APIServer *baseHTTPServer

// MetricStorage collects and store metrics for built-in operator primitives, hook execution
MetricStorage *metric_storage.MetricStorage
// separate metric storage for hook metrics if separate listen port is configured
// HookMetricStorage separate metric storage for metrics, which are returned by user hooks
HookMetricStorage *metric_storage.MetricStorage
KubeClient *klient.Client
ObjectPatcher *object_patch.ObjectPatcher
Expand Down

0 comments on commit be3366a

Please sign in to comment.