Skip to content

Commit

Permalink
feat(archive): Implement data retention. Closes argoproj#2273 (argopr…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Apr 23, 2020
1 parent d0cc776 commit 1f14f2a
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 37 deletions.
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (a *ArtifactRepository) IsArchiveLogs() bool {
type PersistConfig struct {
NodeStatusOffload bool `json:"nodeStatusOffLoad,omitempty"`
// Archive workflows to persistence.
Archive bool `json:"archive,omitempty"`
Archive bool `json:"archive,omitempty"`
// in days
ArchiveTTL TTL `json:"archiveTTL,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool"`
PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"`
Expand Down
44 changes: 44 additions & 0 deletions config/ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"encoding/json"
"errors"
"strconv"
"strings"
"time"
)

// time.Duration forces you to specify in millis, and does not support days
// see https://stackoverflow.com/questions/48050945/how-to-unmarshal-json-into-durations
type TTL time.Duration

func (l TTL) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(l).String())
}

func (l *TTL) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case string:
if value == "" {
*l = 0
return nil
}
if strings.HasSuffix(value, "d") {
hours, err := strconv.Atoi(strings.TrimSuffix(value, "d"))
*l = TTL(time.Duration(hours) * 24 * time.Hour)
return err
}
d, err := time.ParseDuration(value)
if err != nil {
return err
}
*l = TTL(d)
return nil
default:
return errors.New("invalid TTL")
}
}
32 changes: 32 additions & 0 deletions config/ttl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package config

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTTL(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
ttl := TTL(-1)
err := ttl.UnmarshalJSON([]byte(`""`))
if assert.NoError(t, err) {
assert.Equal(t, TTL(0), ttl)
}
})
t.Run("1h", func(t *testing.T) {
ttl := TTL(-1)
err := ttl.UnmarshalJSON([]byte(`"1h"`))
if assert.NoError(t, err) {
assert.Equal(t, TTL(1*time.Hour), ttl)
}
})
t.Run("1d", func(t *testing.T) {
ttl := TTL(-1)
err := ttl.UnmarshalJSON([]byte(`"1d"`))
if assert.NoError(t, err) {
assert.Equal(t, TTL(24*time.Hour), ttl)
}
})
}
2 changes: 2 additions & 0 deletions docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ data:
nodeStatusOffLoad: false
# save completed workloads to the workflow archive
archive: false
# the number of days to keep archived workflows (the default is forever)
archiveTTL: 180d
# Optional name of the cluster I'm running in. This must be unique for your cluster.
clusterName: default
postgresql:
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ data:
maxOpenConns: 0
nodeStatusOffLoad: true
archive: true
archiveTTL: 7d
mysql:
host: mysql
port: 3306
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ data:
maxOpenConns: 0
nodeStatusOffLoad: true
archive: true
archiveTTL: 7d
postgresql:
host: postgres
port: 5432
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ data:
maxOpenConns: 0
nodeStatusOffLoad: true
archive: true
archiveTTL: 7d
mysql:
host: mysql
port: 3306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ data:
maxOpenConns: 0
nodeStatusOffLoad: true
archive: true
archiveTTL: 7d
postgresql:
host: postgres
port: 5432
Expand Down
2 changes: 2 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ func (m migrate) Exec(ctx context.Context) error {
),
// add m.tableName index
ansiSQLChange(`create index ` + m.tableName + `_i1 on ` + m.tableName + ` (clustername,namespace,updatedat)`),
// index to find records that need deleting, this omits namespaces as this might be null
ansiSQLChange(`create index argo_archived_workflows_i2 on argo_archived_workflows (clustername,instanceid,finishedat)`),
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ func (r *nullWorkflowArchive) GetWorkflow(string) (*wfv1.Workflow, error) {
func (r *nullWorkflowArchive) DeleteWorkflow(string) error {
return fmt.Errorf("deleting archived workflows not supported")
}

func (r *nullWorkflowArchive) DeleteWorkflows(time.Duration) error {
return nil
}
51 changes: 38 additions & 13 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqldb
import (
"context"
"encoding/json"
"fmt"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -47,18 +48,20 @@ type WorkflowArchive interface {
ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error)
GetWorkflow(uid string) (*wfv1.Workflow, error)
DeleteWorkflow(uid string) error
DeleteWorkflows(ttl time.Duration) error
}

type workflowArchive struct {
session sqlbuilder.Database
clusterName string
instanceID string
dbType dbType
session sqlbuilder.Database
clusterName string
managedNamespace string
instanceID string
dbType dbType
}

// NewWorkflowArchive returns a new workflowArchive
func NewWorkflowArchive(session sqlbuilder.Database, clusterName string, instanceID string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, instanceID: instanceID, dbType: dbTypeFor(session)}
func NewWorkflowArchive(session sqlbuilder.Database, clusterName, managedNamespace, instanceID string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, managedNamespace: managedNamespace, instanceID: instanceID, dbType: dbTypeFor(session)}
}

func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Expand All @@ -71,7 +74,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
return r.session.Tx(context.Background(), func(sess sqlbuilder.Tx) error {
_, err := sess.
DeleteFrom(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(db.Cond{"uid": wf.UID}).
Exec()
if err != nil {
Expand Down Expand Up @@ -121,8 +124,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, minStartedAt, maxStart
err = r.session.
Select("name", "namespace", "uid", "phase", "startedat", "finishedat").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
And(startedAtClause(minStartedAt, maxStartedAt)).
And(clause).
Expand Down Expand Up @@ -152,6 +154,14 @@ func (r *workflowArchive) ListWorkflows(namespace string, minStartedAt, maxStart
return wfs, nil
}

func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() db.Compound {
return db.And(
db.Cond{"clustername": r.clusterName},
namespaceEqual(r.managedNamespace),
db.Cond{"instanceid": r.instanceID},
)
}

func startedAtClause(from, to time.Time) db.Compound {
var conds []db.Compound
if !from.IsZero() {
Expand All @@ -176,8 +186,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) {
err := r.session.
Select("workflow").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(db.Cond{"uid": uid}).
One(archivedWf)
if err != nil {
Expand All @@ -197,8 +206,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) {
func (r *workflowArchive) DeleteWorkflow(uid string) error {
rs, err := r.session.
DeleteFrom(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(db.Cond{"uid": uid}).
Exec()
if err != nil {
Expand All @@ -211,3 +219,20 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error {
log.WithFields(log.Fields{"uid": uid, "rowsAffected": rowsAffected}).Debug("Deleted archived workflow")
return nil
}

func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error {
rs, err := r.session.
DeleteFrom(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(fmt.Sprintf("finishedat < current_timestamp - interval '%d' second", int(ttl.Seconds()))).
Exec()
if err != nil {
return err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return err
}
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}
2 changes: 1 addition & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
}
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), configMap.InstanceID)
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, configMap.InstanceID)
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(configMap.InstanceID, offloadRepo, wfArchive, configMap.Links)
Expand Down
3 changes: 3 additions & 0 deletions sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ sonar.projectName=Argo Workflows
sonar.projectVersion=2.x
sonar.host.url=https://sonarcloud.io
sonar.exclusions=**/*.pb*.go,**/*generated*.go,,**/*_test.go,**/mocks/*,,**/vendor/**,**/ui/**
sonar.coverage.exclusions=\
persist/**,\
test/**
sonar.go.tests.reportPath=test-results/test-report.json
sonar.go.coverage.reportPaths=coverage.out
2 changes: 1 addition & 1 deletion test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newPersistence(kubeClient kubernetes.Interface) *Persistence {
if err != nil {
panic(err)
}
workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wcConfig.InstanceID)
workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), Namespace, wcConfig.InstanceID)
return &Persistence{session, offloadNodeStatusRepo, workflowArchive}
} else {
return &Persistence{offloadNodeStatusRepo: sqldb.ExplosiveOffloadNodeStatusRepo, workflowArchive: sqldb.NullWorkflowArchive}
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/manifests/mixins/workflow-controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ spec:
value: 30s
- name: UPPERIO_DB_DEBUG
value: "1"
- name: ARCHIVED_WORKFLOW_GC_PERIOD
value: 30s
18 changes: 0 additions & 18 deletions util/api/util.go

This file was deleted.

2 changes: 1 addition & 1 deletion workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (wfc *WorkflowController) updateConfig(config config.Config) error {
log.Info("Node status offloading is disabled")
}
if persistence.Archive {
wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.Config.InstanceID)
wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, wfc.Config.InstanceID)
log.Info("Workflow archiving is enabled")
} else {
log.Info("Workflow archiving is disabled")
Expand Down
Loading

0 comments on commit 1f14f2a

Please sign in to comment.