From 1f14f2a5f6054a88f740c6799d443216f694f08f Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 22 Apr 2020 21:12:51 -0700 Subject: [PATCH] feat(archive): Implement data retention. Closes #2273 (#2312) --- config/config.go | 4 +- config/ttl.go | 44 ++++++++++++++++ config/ttl_test.go | 32 ++++++++++++ docs/workflow-controller-configmap.yaml | 2 + manifests/quick-start-mysql.yaml | 1 + manifests/quick-start-postgres.yaml | 1 + .../workflow-controller-configmap.yaml | 1 + .../workflow-controller-configmap.yaml | 1 + persist/sqldb/migrate.go | 2 + persist/sqldb/mocks/WorkflowArchive.go | 14 +++++ persist/sqldb/null_workflow_archive.go | 4 ++ persist/sqldb/workflow_archive.go | 51 ++++++++++++++----- server/apiserver/argoserver.go | 2 +- sonar-project.properties | 3 ++ test/e2e/fixtures/persistence.go | 2 +- .../workflow-controller-deployment.yaml | 2 + util/api/util.go | 18 ------- workflow/controller/config.go | 2 +- workflow/controller/controller.go | 45 +++++++++++++++- workflow/controller/controller_test.go | 7 +++ 20 files changed, 201 insertions(+), 37 deletions(-) create mode 100644 config/ttl.go create mode 100644 config/ttl_test.go delete mode 100644 util/api/util.go diff --git a/config/config.go b/config/config.go index a5b63a3e42af..edd5a05bc9b6 100644 --- a/config/config.go +++ b/config/config.go @@ -131,7 +131,9 @@ func (a *ArtifactRepository) IsArchiveLogs() bool { type PersistConfig struct { NodeStatusOffload bool `json:"nodeStatusOffLoad,omitempty"` // Archive workflows to persistence. - Archive bool `json:"archive,omitempty"` + Archive bool `json:"archive,omitempty"` + // in days + ArchiveTTL TTL `json:"archiveTTL,omitempty"` ClusterName string `json:"clusterName,omitempty"` ConnectionPool *ConnectionPool `json:"connectionPool"` PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"` diff --git a/config/ttl.go b/config/ttl.go new file mode 100644 index 000000000000..83c4d0c7b320 --- /dev/null +++ b/config/ttl.go @@ -0,0 +1,44 @@ +package config + +import ( + "encoding/json" + "errors" + "strconv" + "strings" + "time" +) + +// time.Duration forces you to specify in millis, and does not support days +// see https://stackoverflow.com/questions/48050945/how-to-unmarshal-json-into-durations +type TTL time.Duration + +func (l TTL) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(l).String()) +} + +func (l *TTL) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case string: + if value == "" { + *l = 0 + return nil + } + if strings.HasSuffix(value, "d") { + hours, err := strconv.Atoi(strings.TrimSuffix(value, "d")) + *l = TTL(time.Duration(hours) * 24 * time.Hour) + return err + } + d, err := time.ParseDuration(value) + if err != nil { + return err + } + *l = TTL(d) + return nil + default: + return errors.New("invalid TTL") + } +} diff --git a/config/ttl_test.go b/config/ttl_test.go new file mode 100644 index 000000000000..8466fbf32f9b --- /dev/null +++ b/config/ttl_test.go @@ -0,0 +1,32 @@ +package config + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTTL(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + ttl := TTL(-1) + err := ttl.UnmarshalJSON([]byte(`""`)) + if assert.NoError(t, err) { + assert.Equal(t, TTL(0), ttl) + } + }) + t.Run("1h", func(t *testing.T) { + ttl := TTL(-1) + err := ttl.UnmarshalJSON([]byte(`"1h"`)) + if assert.NoError(t, err) { + assert.Equal(t, TTL(1*time.Hour), ttl) + } + }) + t.Run("1d", func(t *testing.T) { + ttl := TTL(-1) + err := ttl.UnmarshalJSON([]byte(`"1d"`)) + if assert.NoError(t, err) { + assert.Equal(t, TTL(24*time.Hour), ttl) + } + }) +} diff --git a/docs/workflow-controller-configmap.yaml b/docs/workflow-controller-configmap.yaml index 91bee362b0cf..c9e1d4191fef 100644 --- a/docs/workflow-controller-configmap.yaml +++ b/docs/workflow-controller-configmap.yaml @@ -143,6 +143,8 @@ data: nodeStatusOffLoad: false # save completed workloads to the workflow archive archive: false + # the number of days to keep archived workflows (the default is forever) + archiveTTL: 180d # Optional name of the cluster I'm running in. This must be unique for your cluster. clusterName: default postgresql: diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 7f6c2d0e44f3..c039ebaebd38 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -369,6 +369,7 @@ data: maxOpenConns: 0 nodeStatusOffLoad: true archive: true + archiveTTL: 7d mysql: host: mysql port: 3306 diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index 6f5a4c3e96fa..469701eaba41 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -369,6 +369,7 @@ data: maxOpenConns: 0 nodeStatusOffLoad: true archive: true + archiveTTL: 7d postgresql: host: postgres port: 5432 diff --git a/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml index 2b22dbfe9018..ae2914b1c0f5 100644 --- a/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml @@ -6,6 +6,7 @@ data: maxOpenConns: 0 nodeStatusOffLoad: true archive: true + archiveTTL: 7d mysql: host: mysql port: 3306 diff --git a/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml index 16ea1dae6684..68bf894877e2 100644 --- a/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml @@ -6,6 +6,7 @@ data: maxOpenConns: 0 nodeStatusOffLoad: true archive: true + archiveTTL: 7d postgresql: host: postgres port: 5432 diff --git a/persist/sqldb/migrate.go b/persist/sqldb/migrate.go index b1499dadd2b9..b3507f2d9ca0 100644 --- a/persist/sqldb/migrate.go +++ b/persist/sqldb/migrate.go @@ -247,6 +247,8 @@ func (m migrate) Exec(ctx context.Context) error { ), // add m.tableName index ansiSQLChange(`create index ` + m.tableName + `_i1 on ` + m.tableName + ` (clustername,namespace,updatedat)`), + // index to find records that need deleting, this omits namespaces as this might be null + ansiSQLChange(`create index argo_archived_workflows_i2 on argo_archived_workflows (clustername,instanceid,finishedat)`), } { err := m.applyChange(ctx, changeSchemaVersion, change) if err != nil { diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index cf41ec38a860..79b73cc54c63 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -44,6 +44,20 @@ func (_m *WorkflowArchive) DeleteWorkflow(uid string) error { return r0 } +// DeleteWorkflows provides a mock function with given fields: ttl +func (_m *WorkflowArchive) DeleteWorkflows(ttl time.Duration) error { + ret := _m.Called(ttl) + + var r0 error + if rf, ok := ret.Get(0).(func(time.Duration) error); ok { + r0 = rf(ttl) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // GetWorkflow provides a mock function with given fields: uid func (_m *WorkflowArchive) GetWorkflow(uid string) (*v1alpha1.Workflow, error) { ret := _m.Called(uid) diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index b3f3bc5d2b82..ae252bdb9b98 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -29,3 +29,7 @@ func (r *nullWorkflowArchive) GetWorkflow(string) (*wfv1.Workflow, error) { func (r *nullWorkflowArchive) DeleteWorkflow(string) error { return fmt.Errorf("deleting archived workflows not supported") } + +func (r *nullWorkflowArchive) DeleteWorkflows(time.Duration) error { + return nil +} diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 502cf8064f84..132ed38699b5 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -3,6 +3,7 @@ package sqldb import ( "context" "encoding/json" + "fmt" "time" log "github.com/sirupsen/logrus" @@ -47,18 +48,20 @@ type WorkflowArchive interface { ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) GetWorkflow(uid string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error + DeleteWorkflows(ttl time.Duration) error } type workflowArchive struct { - session sqlbuilder.Database - clusterName string - instanceID string - dbType dbType + session sqlbuilder.Database + clusterName string + managedNamespace string + instanceID string + dbType dbType } // NewWorkflowArchive returns a new workflowArchive -func NewWorkflowArchive(session sqlbuilder.Database, clusterName string, instanceID string) WorkflowArchive { - return &workflowArchive{session: session, clusterName: clusterName, instanceID: instanceID, dbType: dbTypeFor(session)} +func NewWorkflowArchive(session sqlbuilder.Database, clusterName, managedNamespace, instanceID string) WorkflowArchive { + return &workflowArchive{session: session, clusterName: clusterName, managedNamespace: managedNamespace, instanceID: instanceID, dbType: dbTypeFor(session)} } func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { @@ -71,7 +74,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { return r.session.Tx(context.Background(), func(sess sqlbuilder.Tx) error { _, err := sess. DeleteFrom(archiveTableName). - Where(db.Cond{"clustername": r.clusterName}). + Where(r.clusterManagedNamespaceAndInstanceID()). And(db.Cond{"uid": wf.UID}). Exec() if err != nil { @@ -121,8 +124,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, minStartedAt, maxStart err = r.session. Select("name", "namespace", "uid", "phase", "startedat", "finishedat"). From(archiveTableName). - Where(db.Cond{"clustername": r.clusterName}). - And(db.Cond{"instanceid": r.instanceID}). + Where(r.clusterManagedNamespaceAndInstanceID()). And(namespaceEqual(namespace)). And(startedAtClause(minStartedAt, maxStartedAt)). And(clause). @@ -152,6 +154,14 @@ func (r *workflowArchive) ListWorkflows(namespace string, minStartedAt, maxStart return wfs, nil } +func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() db.Compound { + return db.And( + db.Cond{"clustername": r.clusterName}, + namespaceEqual(r.managedNamespace), + db.Cond{"instanceid": r.instanceID}, + ) +} + func startedAtClause(from, to time.Time) db.Compound { var conds []db.Compound if !from.IsZero() { @@ -176,8 +186,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) { err := r.session. Select("workflow"). From(archiveTableName). - Where(db.Cond{"clustername": r.clusterName}). - And(db.Cond{"instanceid": r.instanceID}). + Where(r.clusterManagedNamespaceAndInstanceID()). And(db.Cond{"uid": uid}). One(archivedWf) if err != nil { @@ -197,8 +206,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) { func (r *workflowArchive) DeleteWorkflow(uid string) error { rs, err := r.session. DeleteFrom(archiveTableName). - Where(db.Cond{"clustername": r.clusterName}). - And(db.Cond{"instanceid": r.instanceID}). + Where(r.clusterManagedNamespaceAndInstanceID()). And(db.Cond{"uid": uid}). Exec() if err != nil { @@ -211,3 +219,20 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error { log.WithFields(log.Fields{"uid": uid, "rowsAffected": rowsAffected}).Debug("Deleted archived workflow") return nil } + +func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error { + rs, err := r.session. + DeleteFrom(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()). + And(fmt.Sprintf("finishedat < current_timestamp - interval '%d' second", int(ttl.Seconds()))). + Exec() + if err != nil { + return err + } + rowsAffected, err := rs.RowsAffected() + if err != nil { + return err + } + log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") + return nil +} \ No newline at end of file diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 3a76c112dd66..cc8f61d3f736 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -129,7 +129,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st } // we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can // disable the archiving - and still read old records - wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), configMap.InstanceID) + wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, configMap.InstanceID) } artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive) grpcServer := as.newGRPCServer(configMap.InstanceID, offloadRepo, wfArchive, configMap.Links) diff --git a/sonar-project.properties b/sonar-project.properties index 91323c73fd9d..a2bcf77fe77e 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -4,5 +4,8 @@ sonar.projectName=Argo Workflows sonar.projectVersion=2.x sonar.host.url=https://sonarcloud.io sonar.exclusions=**/*.pb*.go,**/*generated*.go,,**/*_test.go,**/mocks/*,,**/vendor/**,**/ui/** +sonar.coverage.exclusions=\ + persist/**,\ + test/** sonar.go.tests.reportPath=test-results/test-report.json sonar.go.coverage.reportPaths=coverage.out diff --git a/test/e2e/fixtures/persistence.go b/test/e2e/fixtures/persistence.go index 07b95a33ce8c..07e7cb101b98 100644 --- a/test/e2e/fixtures/persistence.go +++ b/test/e2e/fixtures/persistence.go @@ -36,7 +36,7 @@ func newPersistence(kubeClient kubernetes.Interface) *Persistence { if err != nil { panic(err) } - workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wcConfig.InstanceID) + workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), Namespace, wcConfig.InstanceID) return &Persistence{session, offloadNodeStatusRepo, workflowArchive} } else { return &Persistence{offloadNodeStatusRepo: sqldb.ExplosiveOffloadNodeStatusRepo, workflowArchive: sqldb.NullWorkflowArchive} diff --git a/test/e2e/manifests/mixins/workflow-controller-deployment.yaml b/test/e2e/manifests/mixins/workflow-controller-deployment.yaml index 835dd4c0a65a..3e9d11b04abb 100644 --- a/test/e2e/manifests/mixins/workflow-controller-deployment.yaml +++ b/test/e2e/manifests/mixins/workflow-controller-deployment.yaml @@ -24,3 +24,5 @@ spec: value: 30s - name: UPPERIO_DB_DEBUG value: "1" + - name: ARCHIVED_WORKFLOW_GC_PERIOD + value: 30s \ No newline at end of file diff --git a/util/api/util.go b/util/api/util.go deleted file mode 100644 index 3fbedae4f28b..000000000000 --- a/util/api/util.go +++ /dev/null @@ -1,18 +0,0 @@ -package api - -import ( - "context" - - "github.com/argoproj/argo/pkg/apiclient/workflow" - wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" -) - -func SubmitWorkflowToAPIServer(apiGRPCClient workflow.WorkflowServiceClient, ctx context.Context, wf *wfv1.Workflow, dryRun bool) (*wfv1.Workflow, error) { - - wfReq := workflow.WorkflowCreateRequest{ - Namespace: wf.Namespace, - Workflow: wf, - ServerDryRun: dryRun, - } - return apiGRPCClient.CreateWorkflow(ctx, &wfReq) -} diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 0ca3f22d142f..6886aea81415 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -57,7 +57,7 @@ func (wfc *WorkflowController) updateConfig(config config.Config) error { log.Info("Node status offloading is disabled") } if persistence.Archive { - wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.Config.InstanceID) + wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, wfc.Config.InstanceID) log.Info("Workflow archiving is enabled") } else { log.Info("Workflow archiving is disabled") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index acaac08592a3..1aa671e88b9f 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -185,7 +185,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in go wfc.podInformer.Run(ctx.Done()) go wfc.podLabeler(ctx.Done()) go wfc.podGarbageCollector(ctx.Done()) - go wfc.periodicWorkflowGarbageCollector(ctx.Done()) + go wfc.workflowGarbageCollector(ctx.Done()) + go wfc.archivedWorkflowGarbageCollector(ctx.Done()) wfc.createClusterWorkflowTemplateInformer(ctx) @@ -288,7 +289,7 @@ func (wfc *WorkflowController) podGarbageCollector(stopCh <-chan struct{}) { } } -func (wfc *WorkflowController) periodicWorkflowGarbageCollector(stopCh <-chan struct{}) { +func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) { value, ok := os.LookupEnv("WORKFLOW_GC_PERIOD") periodicity := 5 * time.Minute if ok { @@ -349,6 +350,46 @@ func (wfc *WorkflowController) periodicWorkflowGarbageCollector(stopCh <-chan st } } +func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) { + value, ok := os.LookupEnv("ARCHIVED_WORKFLOW_GC_PERIOD") + periodicity := 24 * time.Hour + if ok { + var err error + periodicity, err = time.ParseDuration(value) + if err != nil { + log.WithFields(log.Fields{"err": err, "value": value}).Fatal("Failed to parse ARCHIVED_WORKFLOW_GC_PERIOD") + } + } + if wfc.Config.Persistence == nil { + log.Info("Persistence disabled - so archived workflow GC disabled - you must restart the controller if you enable this") + return + } + if !wfc.Config.Persistence.Archive { + log.Info("Archive disabled - so archived workflow GC disabled - you must restart the controller if you enable this") + return + } + ttl := wfc.Config.Persistence.ArchiveTTL + if ttl == config.TTL(0) { + log.Info("Archived workflows TTL zero - so archived workflow GC disabled - you must restart the controller if you enable this") + return + } + log.WithFields(log.Fields{"ttl": ttl, "periodicity": periodicity}).Info("Performing archived workflow GC") + ticker := time.NewTicker(periodicity) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + log.Info("Performing archived workflow GC") + err := wfc.wfArchive.DeleteWorkflows(time.Duration(ttl)) + if err != nil { + log.WithField("err", err).Error("Failed to delete archived workflows") + } + } + } +} + func (wfc *WorkflowController) runWorker() { for wfc.processNextItem() { } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 580452fcf6f7..ff7920495e4c 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -319,3 +319,10 @@ func TestClusterController(t *testing.T) { controller.createClusterWorkflowTemplateInformer(context.TODO()) assert.NotNil(t, controller.cwftmplInformer) } + +func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) { + cancel, controller := newController() + defer cancel() + + controller.archivedWorkflowGarbageCollector(make(chan struct{})) +}