From 5273f66eee33e79f0a2824b6fbfd55f22eb48f44 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 13 Sep 2024 13:16:53 +0200 Subject: [PATCH] context propagation: pkg/database/flush --- cmd/crowdsec-cli/clialert/alerts.go | 6 ++-- pkg/apiserver/apiserver.go | 2 +- pkg/database/flush.go | 43 +++++++++++++++-------------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/cmd/crowdsec-cli/clialert/alerts.go b/cmd/crowdsec-cli/clialert/alerts.go index 757a84927e5..0bd0c0c1574 100644 --- a/cmd/crowdsec-cli/clialert/alerts.go +++ b/cmd/crowdsec-cli/clialert/alerts.go @@ -575,15 +575,17 @@ func (cli *cliAlerts) newFlushCmd() *cobra.Command { DisableAutoGenTag: true, RunE: func(cmd *cobra.Command, _ []string) error { cfg := cli.cfg() + ctx := cmd.Context() + if err := require.LAPI(cfg); err != nil { return err } - db, err := require.DBClient(cmd.Context(), cfg.DbConfig) + db, err := require.DBClient(ctx, cfg.DbConfig) if err != nil { return err } log.Info("Flushing alerts. !! This may take a long time !!") - err = db.FlushAlerts(maxAge, maxItems) + err = db.FlushAlerts(ctx, maxAge, maxItems) if err != nil { return fmt.Errorf("unable to flush alerts: %w", err) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 42dcb219379..c73e854fc11 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -170,7 +170,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) { } if config.DbConfig.Flush != nil { - flushScheduler, err = dbClient.StartFlushScheduler(config.DbConfig.Flush) + flushScheduler, err = dbClient.StartFlushScheduler(ctx, config.DbConfig.Flush) if err != nil { return nil, err } diff --git a/pkg/database/flush.go b/pkg/database/flush.go index 5d53d10c942..46c8edfa308 100644 --- a/pkg/database/flush.go +++ b/pkg/database/flush.go @@ -1,6 +1,7 @@ package database import ( + "context" "errors" "fmt" "time" @@ -26,7 +27,7 @@ const ( flushInterval = 1 * time.Minute ) -func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) { +func (c *Client) StartFlushScheduler(ctx context.Context, config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) { maxItems := 0 maxAge := "" @@ -45,7 +46,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched // Init & Start cronjob every minute for alerts scheduler := gocron.NewScheduler(time.UTC) - job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems) + job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, ctx, maxAge, maxItems) if err != nil { return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err) } @@ -100,14 +101,14 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched } } - baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC) + baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, ctx, config.AgentsGC, config.BouncersGC) if err != nil { return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err) } baJob.SingletonMode() - metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge) + metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, ctx, config.MetricsMaxAge) if err != nil { return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err) } @@ -120,7 +121,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched } // flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not -func (c *Client) flushMetrics(maxAge *time.Duration) { +func (c *Client) flushMetrics(ctx context.Context, maxAge *time.Duration) { if maxAge == nil { maxAge = ptr.Of(defaultMetricsMaxAge) } @@ -129,7 +130,7 @@ func (c *Client) flushMetrics(maxAge *time.Duration) { deleted, err := c.Ent.Metric.Delete().Where( metric.ReceivedAtLTE(time.Now().UTC().Add(-*maxAge)), - ).Exec(c.CTX) + ).Exec(ctx) if err != nil { c.Log.Errorf("while flushing metrics: %s", err) return @@ -140,10 +141,10 @@ func (c *Client) flushMetrics(maxAge *time.Duration) { } } -func (c *Client) FlushOrphans() { +func (c *Client) FlushOrphans(ctx context.Context) { /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */ /* We want to take care of orphaned events for which the parent alert/decision has been deleted */ - eventsCount, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX) + eventsCount, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(ctx) if err != nil { c.Log.Warningf("error while deleting orphan events: %s", err) return @@ -154,7 +155,7 @@ func (c *Client) FlushOrphans() { } eventsCount, err = c.Ent.Decision.Delete().Where( - decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX) + decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(ctx) if err != nil { c.Log.Warningf("error while deleting orphan decisions: %s", err) return @@ -165,7 +166,7 @@ func (c *Client) FlushOrphans() { } } -func (c *Client) flushBouncers(authType string, duration *time.Duration) { +func (c *Client) flushBouncers(ctx context.Context, authType string, duration *time.Duration) { if duration == nil { return } @@ -174,7 +175,7 @@ func (c *Client) flushBouncers(authType string, duration *time.Duration) { bouncer.LastPullLTE(time.Now().UTC().Add(-*duration)), ).Where( bouncer.AuthTypeEQ(authType), - ).Exec(c.CTX) + ).Exec(ctx) if err != nil { c.Log.Errorf("while auto-deleting expired bouncers (%s): %s", authType, err) return @@ -185,7 +186,7 @@ func (c *Client) flushBouncers(authType string, duration *time.Duration) { } } -func (c *Client) flushAgents(authType string, duration *time.Duration) { +func (c *Client) flushAgents(ctx context.Context, authType string, duration *time.Duration) { if duration == nil { return } @@ -194,7 +195,7 @@ func (c *Client) flushAgents(authType string, duration *time.Duration) { machine.LastHeartbeatLTE(time.Now().UTC().Add(-*duration)), machine.Not(machine.HasAlerts()), machine.AuthTypeEQ(authType), - ).Exec(c.CTX) + ).Exec(ctx) if err != nil { c.Log.Errorf("while auto-deleting expired machines (%s): %s", authType, err) return @@ -205,23 +206,23 @@ func (c *Client) flushAgents(authType string, duration *time.Duration) { } } -func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error { +func (c *Client) FlushAgentsAndBouncers(ctx context.Context, agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error { log.Debug("starting FlushAgentsAndBouncers") if agentsCfg != nil { - c.flushAgents(types.TlsAuthType, agentsCfg.CertDuration) - c.flushAgents(types.PasswordAuthType, agentsCfg.LoginPasswordDuration) + c.flushAgents(ctx, types.TlsAuthType, agentsCfg.CertDuration) + c.flushAgents(ctx, types.PasswordAuthType, agentsCfg.LoginPasswordDuration) } if bouncersCfg != nil { - c.flushBouncers(types.TlsAuthType, bouncersCfg.CertDuration) - c.flushBouncers(types.ApiKeyAuthType, bouncersCfg.ApiDuration) + c.flushBouncers(ctx, types.TlsAuthType, bouncersCfg.CertDuration) + c.flushBouncers(ctx, types.ApiKeyAuthType, bouncersCfg.ApiDuration) } return nil } -func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { +func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) error { var ( deletedByAge int deletedByNbItem int @@ -235,7 +236,7 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { } c.Log.Debug("Flushing orphan alerts") - c.FlushOrphans() + c.FlushOrphans(ctx) c.Log.Debug("Done flushing orphan alerts") totalAlerts, err = c.TotalAlerts() @@ -287,7 +288,7 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { if maxid > 0 { // This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted - deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX) + deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(ctx) if err != nil { c.Log.Errorf("FlushAlerts: Could not delete alerts: %s", err) return fmt.Errorf("could not delete alerts: %w", err)