diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 94f7073..9a22db1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,3 +32,28 @@ jobs: run: | go vet ./... staticcheck ./... + + test: + strategy: + matrix: + go: ['1.20', '1.21', '1.22'] + name: test @ Go ${{ matrix.go }} + runs-on: ubuntu-latest + steps: + - name: Install Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Checkout code + uses: actions/checkout@v4 + + - uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-build- + + - name: Test + run: go test -v -race ./... diff --git a/go.mod b/go.mod index b604059..14da4a9 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( ) require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect golang.org/x/sys v0.12.0 // indirect diff --git a/go.sum b/go.sum index 7a36ec3..4ad3ca8 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= @@ -5,6 +7,7 @@ github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= diff --git a/nanoq_test.go b/nanoq_test.go new file mode 100644 index 0000000..a0e29e5 --- /dev/null +++ b/nanoq_test.go @@ -0,0 +1,186 @@ +package nanoq_test + +import ( + "context" + "errors" + "slices" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/bojanz/nanoq" + "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/oklog/ulid/v2" + "github.com/rs/zerolog" +) + +func Test_NewTask(t *testing.T) { + t.Run("empty_task", func(t *testing.T) { + task := nanoq.NewTask("my-type", nil) + + if _, err := ulid.ParseStrict(task.ID); err != nil { + t.Errorf("id: %v", err) + } + if task.Type != "my-type" { + t.Errorf("type: got %q, want %q", task.Type, "my-type") + } + if !slices.Equal(task.Payload, []byte("{}")) { + t.Errorf("payload: got %q, want %q", task.Payload, []byte("{}")) + } + if task.MaxRetries != 10 { + t.Errorf("max retries: got %v, want %v", task.MaxRetries, 10) + } + if task.CreatedAt.IsZero() { + t.Errorf("created_at must not be empty") + } + if task.ScheduledAt.IsZero() { + t.Errorf("scheduled_at must not be empty") + } + if !task.CreatedAt.Equal(task.ScheduledAt) { + t.Errorf("created_at %q does not match scheduled_at %q", task.CreatedAt, task.ScheduledAt) + } + if task.Fingerprint != "25c084d0" { + t.Errorf("fingerprint: got %q, want %q", task.Fingerprint, "25c084d0") + } + }) + + t.Run("payload_and_options", func(t *testing.T) { + payload := []byte(`{"product_id": "123", "user_id": "456"}`) + scheduledAt := time.Date(2030, 1, 1, 0, 0, 0, 0, time.UTC) + task := nanoq.NewTask("my-type", payload, nanoq.WithMaxRetries(2), nanoq.WithScheduledAt(scheduledAt)) + + if _, err := ulid.ParseStrict(task.ID); err != nil { + t.Errorf("id: %v", err) + } + if task.Type != "my-type" { + t.Errorf("type: got %q, want %q", task.Type, "my-type") + } + if !slices.Equal(task.Payload, payload) { + t.Errorf("payload: got %q, want %q", task.Payload, payload) + } + if task.MaxRetries != 2 { + t.Errorf("max retries: got %v, want %v", task.MaxRetries, 2) + } + if task.CreatedAt.IsZero() { + t.Errorf("created_at must not be empty") + } + if !task.ScheduledAt.Equal(scheduledAt) { + t.Errorf("created_at: got %q want %q", task.ScheduledAt, scheduledAt) + } + if task.Fingerprint != "3f16b1c4" { + t.Errorf("fingerprint: got %q, want %q", task.Fingerprint, "3f16b1c4") + } + }) + + t.Run("custom_fingerprint", func(t *testing.T) { + payload := []byte(`{"product_id": "123", "user_id": "456"}`) + fingerprintData := []byte(`{"product_id": "123"}`) + task := nanoq.NewTask("my-type", payload, nanoq.WithFingerprintData(fingerprintData)) + + if task.Type != "my-type" { + t.Errorf("type: got %q, want %q", task.Type, "my-type") + } + if !slices.Equal(task.Payload, payload) { + t.Errorf("payload: got %q, want %q", task.Payload, payload) + } + if task.Fingerprint != "a48cb4c4" { + t.Errorf("fingerprint: got %q, want %q", task.Fingerprint, "a48cb4c4") + } + }) +} + +func TestClient_CreateTask(t *testing.T) { + ctx := context.Background() + db, mock, _ := sqlmock.New() + defer db.Close() + client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) + task := nanoq.NewTask("my-type", nil) + + t.Run("success", func(t *testing.T) { + mock.ExpectBegin() + mock.ExpectExec(`INSERT INTO tasks(.+) VALUES(.+)`). + WithArgs(task.ID, task.Fingerprint, task.Type, task.Payload, task.Retries, task.MaxRetries, task.CreatedAt, task.ScheduledAt). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + client.RunTransaction(ctx, func(tx *sqlx.Tx) error { + return client.CreateTask(ctx, tx, task) + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } + }) + + t.Run("duplicate", func(t *testing.T) { + mock.ExpectBegin() + mock.ExpectExec(`INSERT INTO tasks(.+) VALUES(.+)`). + WithArgs(task.ID, task.Fingerprint, task.Type, task.Payload, task.Retries, task.MaxRetries, task.CreatedAt, task.ScheduledAt). + WillReturnError(&mysql.MySQLError{Number: 1022}) + mock.ExpectRollback() + + err := client.RunTransaction(ctx, func(tx *sqlx.Tx) error { + return client.CreateTask(ctx, tx, task) + }) + if err != nanoq.ErrDuplicateTask { + t.Errorf("got %v, want ErrDuplicateTask", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } + }) +} + +func TestProcessor_Run(t *testing.T) { + db, mock, _ := sqlmock.New() + defer db.Close() + client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) + processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { + // Fail the task once. + if task.Retries == 0 { + return errors.New("temporary error") + } + return nil + }) + errorHandlerCalled := 0 + processor.OnError(func(ctx context.Context, task nanoq.Task, err error) { + errorHandlerCalled++ + }) + + // First task claim and retry. + mock.ExpectBegin() + rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}). + AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now()) + mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows) + + mock.ExpectExec("UPDATE tasks SET retries = (.+), scheduled_at = (.+) WHERE id = (.+)").WithArgs(1, sqlmock.AnyArg(), "01HQJHTZCAT5WDCGVTWJ640VMM"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + // Second task claim and deletion (due to success). + mock.ExpectBegin() + rows = sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}). + AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "1", "1", time.Now(), time.Now()) + mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows) + + mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + ctx, cancel := context.WithCancel(context.Background()) + go processor.Run(ctx, 1, 1*time.Second) + time.Sleep(1 * time.Second) + cancel() + + err := mock.ExpectationsWereMet() + if err != nil { + t.Error(err) + } + + if errorHandlerCalled != 1 { + t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1) + } +}