Skip to content

Commit

Permalink
Stop retrying tasks without a handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Apr 8, 2024
1 parent 4b3746d commit 909f0e2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
2 changes: 1 addition & 1 deletion nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *Processor) process(ctx context.Context) error {
h, ok := p.handlers[t.Type]
if !ok {
h = func(ctx context.Context, t Task) error {
return fmt.Errorf("no handler not found for task type %v", t.Type)
return fmt.Errorf("no handler found for task type %v: %w", t.Type, ErrSkipRetry)
}
}
// Apply global middleware.
Expand Down
38 changes: 38 additions & 0 deletions nanoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,44 @@ func TestProcessor_Run_Panic(t *testing.T) {
}
}

func TestProcessor_Run_NoHandler(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
if !errors.Is(err, nanoq.ErrSkipRetry) && !strings.Contains("no handler found for task type my-type", err.Error()) {
t.Errorf("error handler called with unexpected error: %v", err)
}
errorHandlerCalled++
})

// Task claim and deletion.
mock.ExpectBegin()
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)

mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

ctx, cancel := context.WithCancel(context.Background())
go processor.Run(ctx, 1, 1*time.Second)
time.Sleep(1 * time.Second)
cancel()

err := mock.ExpectationsWereMet()
if err != nil {
t.Error(err)
}

if errorHandlerCalled != 1 {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
}
}

func TestProcessor_Run_Middleware(t *testing.T) {
// Used to store and retrieve the context value.
type contextKey string
Expand Down

0 comments on commit 909f0e2

Please sign in to comment.