diff --git a/config/config.go b/config/config.go index edd5a05bc9b6..8a202b74fc90 100644 --- a/config/config.go +++ b/config/config.go @@ -222,6 +222,7 @@ type HDFSArtifactRepository struct { type PrometheusConfig struct { Enabled bool `json:"enabled,omitempty"` DisableLegacy bool `json:"disableLegacy"` + MetricsTTL TTL `json:"metricsTTL"` Path string `json:"path,omitempty"` Port string `json:"port,omitempty"` } diff --git a/config/ttl.go b/config/ttl.go index 83c4d0c7b320..d26f00fcb806 100644 --- a/config/ttl.go +++ b/config/ttl.go @@ -28,8 +28,23 @@ func (l *TTL) UnmarshalJSON(b []byte) error { return nil } if strings.HasSuffix(value, "d") { - hours, err := strconv.Atoi(strings.TrimSuffix(value, "d")) - *l = TTL(time.Duration(hours) * 24 * time.Hour) + days, err := strconv.Atoi(strings.TrimSuffix(value, "d")) + *l = TTL(time.Duration(days) * 24 * time.Hour) + return err + } + if strings.HasSuffix(value, "h") { + hours, err := strconv.Atoi(strings.TrimSuffix(value, "h")) + *l = TTL(time.Duration(hours) * time.Hour) + return err + } + if strings.HasSuffix(value, "m") { + minutes, err := strconv.Atoi(strings.TrimSuffix(value, "m")) + *l = TTL(time.Duration(minutes) * time.Minute) + return err + } + if strings.HasSuffix(value, "s") { + seconds, err := strconv.Atoi(strings.TrimSuffix(value, "s")) + *l = TTL(time.Duration(seconds) * time.Second) return err } d, err := time.ParseDuration(value) diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 300771fce1ce..37845a837bd5 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -363,6 +363,7 @@ data: scope: pod url: http://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name} metricsConfig: | + disableLegacy: true enabled: true path: /metrics port: 9090 diff --git a/manifests/quick-start-no-db.yaml b/manifests/quick-start-no-db.yaml index 7326f65bf465..3cac48aa8519 100644 --- a/manifests/quick-start-no-db.yaml +++ b/manifests/quick-start-no-db.yaml @@ -363,6 +363,7 @@ data: scope: pod url: http://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name} metricsConfig: | + disableLegacy: true enabled: true path: /metrics port: 9090 diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index db232fe4678f..228c9140a3ef 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -363,6 +363,7 @@ data: scope: pod url: http://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name} metricsConfig: | + disableLegacy: true enabled: true path: /metrics port: 9090 diff --git a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml index 04965259d0d7..ce1567eecd93 100644 --- a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml @@ -15,6 +15,7 @@ data: featureFlags: | resourcesDuration: true metricsConfig: | + disableLegacy: true enabled: true path: /metrics port: 9090 @@ -27,4 +28,4 @@ data: url: http://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name} kind: ConfigMap metadata: - name: workflow-controller-configmap \ No newline at end of file + name: workflow-controller-configmap diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index 79b73cc54c63..657ca1df446a 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -30,13 +30,13 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } -// DeleteWorkflow provides a mock function with given fields: uid -func (_m *WorkflowArchive) DeleteWorkflow(uid string) error { - ret := _m.Called(uid) +// DeleteExpiredWorkflows provides a mock function with given fields: ttl +func (_m *WorkflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { + ret := _m.Called(ttl) var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(uid) + if rf, ok := ret.Get(0).(func(time.Duration) error); ok { + r0 = rf(ttl) } else { r0 = ret.Error(0) } @@ -44,13 +44,13 @@ func (_m *WorkflowArchive) DeleteWorkflow(uid string) error { return r0 } -// DeleteWorkflows provides a mock function with given fields: ttl -func (_m *WorkflowArchive) DeleteWorkflows(ttl time.Duration) error { - ret := _m.Called(ttl) +// DeleteWorkflow provides a mock function with given fields: uid +func (_m *WorkflowArchive) DeleteWorkflow(uid string) error { + ret := _m.Called(uid) var r0 error - if rf, ok := ret.Get(0).(func(time.Duration) error); ok { - r0 = rf(ttl) + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(uid) } else { r0 = ret.Error(0) } diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index ae252bdb9b98..fcc177e19e29 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -30,6 +30,6 @@ func (r *nullWorkflowArchive) DeleteWorkflow(string) error { return fmt.Errorf("deleting archived workflows not supported") } -func (r *nullWorkflowArchive) DeleteWorkflows(time.Duration) error { +func (r *nullWorkflowArchive) DeleteExpiredWorkflows(time.Duration) error { return nil } diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 132ed38699b5..9f5a79cf412d 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -48,7 +48,7 @@ type WorkflowArchive interface { ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) GetWorkflow(uid string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error - DeleteWorkflows(ttl time.Duration) error + DeleteExpiredWorkflows(ttl time.Duration) error } type workflowArchive struct { @@ -220,7 +220,7 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error { return nil } -func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error { +func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { rs, err := r.session. DeleteFrom(archiveTableName). Where(r.clusterManagedNamespaceAndInstanceID()). @@ -235,4 +235,4 @@ func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error { } log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") return nil -} \ No newline at end of file +} diff --git a/workflow/common/metrics.go b/workflow/common/metrics.go new file mode 100644 index 000000000000..bab514c08452 --- /dev/null +++ b/workflow/common/metrics.go @@ -0,0 +1,12 @@ +package common + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type Metric struct { + Metric prometheus.Metric + LastUpdated time.Time +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1e0056772283..f93e133036b5 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -83,7 +83,7 @@ type WorkflowController struct { session sqlbuilder.Database offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo wfArchive sqldb.WorkflowArchive - Metrics map[string]prometheus.Metric + Metrics map[string]common.Metric } const ( @@ -120,7 +120,7 @@ func NewWorkflowController( configController: config.NewController(namespace, configMap, kubeclientset), completedPods: make(chan string, 512), gcPods: make(chan string, 512), - Metrics: make(map[string]prometheus.Metric), + Metrics: make(map[string]common.Metric), } wfc.throttler = NewThrottler(0, wfc.wfQueue) return &wfc @@ -187,6 +187,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in go wfc.podGarbageCollector(ctx.Done()) go wfc.workflowGarbageCollector(ctx.Done()) go wfc.archivedWorkflowGarbageCollector(ctx.Done()) + go wfc.metricsGarbageCollector(ctx.Done()) wfc.createClusterWorkflowTemplateInformer(ctx) @@ -382,7 +383,7 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st return case <-ticker.C: log.Info("Performing archived workflow GC") - err := wfc.wfArchive.DeleteWorkflows(time.Duration(ttl)) + err := wfc.wfArchive.DeleteExpiredWorkflows(time.Duration(ttl)) if err != nil { log.WithField("err", err).Error("Failed to delete archived workflows") } @@ -390,6 +391,32 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st } } +func (wfc *WorkflowController) metricsGarbageCollector(stopCh <-chan struct{}) { + if !wfc.Config.MetricsConfig.Enabled { + log.Info("Cannot start metrics GC: metrics are disabled") + return + } + ttl := wfc.Config.MetricsConfig.MetricsTTL + if ttl == config.TTL(0) { + log.Info("Metrics TTL is zero, metrics GC is disabled") + return + } + duration := time.Duration(ttl) + log.WithFields(log.Fields{"ttl": ttl}).Info("Performing metrics GC") + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-stopCh: + log.Info("Stopping metrics GC") + return + case <-ticker.C: + log.Info("Performing metrics GC") + wfc.DeleteExpiredMetrics(duration) + } + } +} + func (wfc *WorkflowController) runWorker() { for wfc.processNextItem() { } @@ -736,6 +763,18 @@ func (wfc *WorkflowController) GetContainerRuntimeExecutor() string { return wfc.Config.ContainerRuntimeExecutor } -func (wfc *WorkflowController) GetMetrics() map[string]prometheus.Metric { - return wfc.Metrics +func (wfc *WorkflowController) GetMetrics() []prometheus.Metric { + var out []prometheus.Metric + for _, metric := range wfc.Metrics { + out = append(out, metric.Metric) + } + return out +} + +func (wfc *WorkflowController) DeleteExpiredMetrics(ttl time.Duration) { + for key, metric := range wfc.Metrics { + if time.Since(metric.LastUpdated) > ttl { + delete(wfc.Metrics, key) + } + } } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index ff7920495e4c..d830816356e5 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -5,7 +5,8 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" + "github.com/argoproj/argo/workflow/common" + "github.com/stretchr/testify/assert" authorizationv1 "k8s.io/api/authorization/v1" apiv1 "k8s.io/api/core/v1" @@ -125,7 +126,7 @@ func newController(objects ...runtime.Object) (context.CancelFunc, *WorkflowCont wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), wfArchive: sqldb.NullWorkflowArchive, - Metrics: make(map[string]prometheus.Metric), + Metrics: make(map[string]common.Metric), } return cancel, controller } @@ -326,3 +327,21 @@ func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) { controller.archivedWorkflowGarbageCollector(make(chan struct{})) } + +func TestWorkflowControllerMetricsGarbageCollector(t *testing.T) { + cancel, controller := newController() + defer cancel() + + controller.Metrics["metric-1"] = common.Metric{Metric: nil, LastUpdated: time.Now().Add(-1 * time.Minute)} + controller.Metrics["metric-2"] = common.Metric{Metric: nil, LastUpdated: time.Now().Add(3 * time.Second)} + + controller.Config.MetricsConfig.Enabled = true + controller.Config.MetricsConfig.MetricsTTL = config.TTL(1 * time.Second) + + stop := make(chan struct{}) + go func() { time.Sleep(2 * time.Second); stop <- struct{}{} }() + controller.metricsGarbageCollector(stop) + + assert.Contains(t, controller.Metrics, "metric-2") + assert.NotContains(t, controller.Metrics, "metric-1") +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f756bf770cec..fb67b2a056ca 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -2494,7 +2494,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc continue } updatedMetric := metrics.ConstructRealTimeGaugeMetric(metricTmpl, valueFunc) - woc.controller.Metrics[metricTmpl.GetDesc()] = updatedMetric + woc.controller.Metrics[metricTmpl.GetDesc()] = common.Metric{Metric: updatedMetric, LastUpdated: time.Now()} continue } else { metricSpec := metricTmpl.DeepCopy() @@ -2508,14 +2508,14 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc } metricSpec.SetValueString(replacedValue) - metric := woc.controller.Metrics[metricSpec.GetDesc()] + metric := woc.controller.Metrics[metricSpec.GetDesc()].Metric // It is valid to pass a nil metric to ConstructOrUpdateMetric, in that case the metric will be created for us updatedMetric, err := metrics.ConstructOrUpdateMetric(metric, metricSpec) if err != nil { woc.reportMetricEmissionError(fmt.Sprintf("could not compute metric '%s': %s", metricSpec.Name, err)) continue } - woc.controller.Metrics[metricSpec.GetDesc()] = updatedMetric + woc.controller.Metrics[metricSpec.GetDesc()] = common.Metric{Metric: updatedMetric, LastUpdated: time.Now()} continue } } diff --git a/workflow/controller/operator_metrics_test.go b/workflow/controller/operator_metrics_test.go index 11891e9a891b..8c789faac9b8 100644 --- a/workflow/controller/operator_metrics_test.go +++ b/workflow/controller/operator_metrics_test.go @@ -61,7 +61,7 @@ func TestBasicMetric(t *testing.T) { metricDesc := wf.Spec.Templates[0].Metrics.Prometheus[0].GetDesc() assert.Contains(t, controller.Metrics, metricDesc) - metric := controller.Metrics[metricDesc].(prometheus.Gauge) + metric := controller.Metrics[metricDesc].Metric.(prometheus.Gauge) metrtcString, err := getMetricStringValue(metric) assert.NoError(t, err) assert.Contains(t, metrtcString, `label: gauge: counter:`) - metricErrorCounter := controller.Metrics[metricErrorDesc].(prometheus.Counter) + metricErrorCounter := controller.Metrics[metricErrorDesc].Metric.(prometheus.Counter) metricErrorCounterString, err := getMetricStringValue(metricErrorCounter) assert.NoError(t, err) assert.Contains(t, metricErrorCounterString, `label: counter:`) diff --git a/workflow/metrics/collector.go b/workflow/metrics/collector.go index a45c16022d32..c2cc2c186fa7 100644 --- a/workflow/metrics/collector.go +++ b/workflow/metrics/collector.go @@ -2,6 +2,7 @@ package metrics import ( "os" + "time" "github.com/prometheus/client_golang/prometheus" "k8s.io/client-go/tools/cache" @@ -15,7 +16,8 @@ const ( ) type MetricsProvider interface { - GetMetrics() map[string]prometheus.Metric + GetMetrics() []prometheus.Metric + DeleteExpiredMetrics(ttl time.Duration) } func NewMetricsRegistry(metricsProvider MetricsProvider, informer cache.SharedIndexInformer, disableLegacyMetrics bool) *prometheus.Registry {