diff --git a/README.md b/README.md index d14cec1..f34ebf0 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,15 @@ Finally, initialize the processor: // logger is assumed to be a zerolog instance. processor := nanoq.NewProcessor(nanoq.NewClient(db), logger) +// The default retry policy uses an exponential backoff with jitter, +// but callers can provide their own if necessary. +processor.RetryPolicy(func (t nanoq.Task) { + // First retry in 5s, every next retry in 1h. + if t.Retries == 0 { + return 5 * time.Second + } + return 1 * time.Hour +}) processor.OnError(func(ctx context.Context, t nanoq.Task, err error) { // Log each failed task. // Idea: Send to Sentry when t.Retries == t.MaxRetries. diff --git a/nanoq.go b/nanoq.go index 674af6c..6324d1b 100644 --- a/nanoq.go +++ b/nanoq.go @@ -180,8 +180,11 @@ func (c *Client) ClaimTask(ctx context.Context, tx *sqlx.Tx) (Task, error) { return t, nil } -// UpdateTask updates the given task. -func (c *Client) UpdateTask(ctx context.Context, tx *sqlx.Tx, t Task) error { +// RetryTask schedules a retry of the given task. +func (c *Client) RetryTask(ctx context.Context, tx *sqlx.Tx, t Task, retryIn time.Duration) error { + t.Retries++ + t.ScheduledAt = time.Now().UTC().Add(retryIn) + _, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t) if err != nil { return err @@ -247,8 +250,20 @@ type ( // Middleware wrap a handler in order to run logic before/after it. Middleware func(next Handler) Handler + + // RetryPolicy determines the retry delay for a given task. + RetryPolicy func(t Task) time.Duration ) +// DefaultRetryPolicy uses an exponential base delay with jitter. +// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h. +func DefaultRetryPolicy(t Task) time.Duration { + exp := 5 + int(math.Pow(float64(t.Retries+1), float64(5))) + s := exp + rand.IntN(exp/2) + + return time.Duration(s) * time.Second +} + // Processor represents the queue processor. type Processor struct { client *Client @@ -257,6 +272,7 @@ type Processor struct { errorHandler ErrorHandler handlers map[string]Handler middleware []Middleware + retryPolicy RetryPolicy done atomic.Bool } @@ -264,10 +280,11 @@ type Processor struct { // NewProcessor creates a new processor. func NewProcessor(client *Client, logger zerolog.Logger) *Processor { return &Processor{ - client: client, - logger: logger, - handlers: make(map[string]Handler), - middleware: make([]Middleware, 0), + client: client, + logger: logger, + handlers: make(map[string]Handler), + middleware: make([]Middleware, 0), + retryPolicy: DefaultRetryPolicy, } } @@ -290,6 +307,11 @@ func (p *Processor) Handle(taskType string, h Handler, ms ...Middleware) { p.handlers[taskType] = h } +// RetryPolicy registers the given retry policy. +func (p *Processor) RetryPolicy(rp RetryPolicy) { + p.retryPolicy = rp +} + // Run starts the processor and blocks until a shutdown signal (SIGINT/SIGTERM) is received. // // Once the shutdown signal is received, workers stop claiming new tasks. @@ -369,10 +391,8 @@ func (p *Processor) process(ctx context.Context) error { p.errorHandler(ctx, t, err) } if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) { - t.Retries = t.Retries + 1 - t.ScheduledAt = getNextRetryTime(int(t.Retries)) - - if err := p.client.UpdateTask(ctx, tx, t); err != nil { + retryIn := p.retryPolicy(t) + if err := p.client.RetryTask(ctx, tx, t, retryIn); err != nil { return fmt.Errorf("update task %v: %w", t.ID, err) } @@ -411,14 +431,3 @@ func callHandler(ctx context.Context, h Handler, t Task) (err error) { return h(ctx, t) } - -// getNextRetryTime returns the time of the next retry. -// Uses an exponential base delay with jitter. -// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h. -func getNextRetryTime(n int) time.Time { - exp := 5 + int(math.Pow(float64(n), float64(5))) - s := exp + rand.IntN(exp/2) - d := time.Duration(s) * time.Second - - return time.Now().UTC().Add(d) -} diff --git a/nanoq_test.go b/nanoq_test.go index 619038a..e4a21c5 100644 --- a/nanoq_test.go +++ b/nanoq_test.go @@ -152,6 +152,11 @@ func TestProcessor_Run(t *testing.T) { defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) processor := nanoq.NewProcessor(client, zerolog.Nop()) + retryPolicyCalled := 0 + processor.RetryPolicy(func(t nanoq.Task) time.Duration { + retryPolicyCalled++ + return 1 * time.Second + }) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { // Fail the task once. if task.Retries == 0 { @@ -199,6 +204,10 @@ func TestProcessor_Run(t *testing.T) { if errorHandlerCalled != 1 { t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1) } + + if retryPolicyCalled != 1 { + t.Errorf("retry policy called %v times instead of %v", retryPolicyCalled, 1) + } } func TestProcessor_Run_RetriesExhausted(t *testing.T) {