diff --git a/README.md b/README.md index a93a5cb..550f19f 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ type recalculateStockPayload struct { } // This could also be a method on a Handler struct containing dependencies. -func RecalculateStock(logger zerolog.Logger) nanoq.Handler { +func RecalculateStock(logger *slog.Logger) nanoq.Handler { return func(ctx context.Context, t nanoq.Task) error { var payload recalculateStockPayload if err := json.Unmarshal(t.Payload, &payload); err != nil { @@ -39,10 +39,10 @@ func RecalculateStock(logger zerolog.Logger) nanoq.Handler { // Do your thing here. - logger.Info(). - Str("task_type", "recalculate-stock"). - Str("product_id", payload.ProductID). - Msg("Task completed") + logger.Info("Task completed", + slog.String("task_type", "recalculate-stock"), + slog.String("product_id", payload.ProductID), + ) return nil } @@ -67,7 +67,7 @@ if err := queueClient.CreateTask(ctx, tx, t); err != nanoq.ErrDuplicateTask { Finally, initialize the processor: ```go -// logger is assumed to be a zerolog instance. +// logger is an existing *slog.Logger. processor := nanoq.NewProcessor(nanoq.NewClient(db), logger) // The default retry policy uses an exponential backoff with jitter, @@ -82,10 +82,10 @@ processor.RetryPolicy(func (t nanoq.Task) { processor.OnError(func(ctx context.Context, t nanoq.Task, err error) { // Log each failed task. // Idea: Send to Sentry when t.Retries == t.MaxRetries. - logger.Error(). - Str("task_type", t.Type). - Str("attempt", fmt.Sprintf("%v/%v", t.Retries, t.MaxRetries)). - Msg(err.Error()) + logger.Error(err.Error(), + slog.String("task_type", t.Type), + slog.String("attempt", fmt.Sprintf("%v/%v", t.Retries, t.MaxRetries)), + ) }) processor.Handle("recalculate-stock", RecalculateStock(logger)) diff --git a/go.mod b/go.mod index 14da4a9..bae19dd 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,8 @@ module github.com/bojanz/nanoq go 1.22.0 require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/go-sql-driver/mysql v1.7.1 github.com/jmoiron/sqlx v1.3.5 github.com/oklog/ulid/v2 v2.1.0 - github.com/rs/zerolog v1.32.0 -) - -require ( - github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect - golang.org/x/sys v0.12.0 // indirect ) diff --git a/go.sum b/go.sum index 4ad3ca8..96f2797 100644 --- a/go.sum +++ b/go.sum @@ -1,30 +1,15 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= -github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/nanoq.go b/nanoq.go index 24dc22e..b654577 100644 --- a/nanoq.go +++ b/nanoq.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "hash/crc32" + "log/slog" "math" "math/rand/v2" "os" @@ -19,7 +20,6 @@ import ( "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/oklog/ulid/v2" - "github.com/rs/zerolog" ) var ( @@ -291,7 +291,7 @@ func DefaultRetryPolicy(t Task) time.Duration { // Processor represents the queue processor. type Processor struct { client *Client - logger zerolog.Logger + logger *slog.Logger errorHandler ErrorHandler handlers map[string]Handler @@ -303,7 +303,7 @@ type Processor struct { } // NewProcessor creates a new processor. -func NewProcessor(client *Client, logger zerolog.Logger) *Processor { +func NewProcessor(client *Client, logger *slog.Logger) *Processor { return &Processor{ client: client, logger: logger, @@ -353,14 +353,14 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti case <-ctx.Done(): } - p.logger.Info().Str("timeout", shutdownTimeout.String()).Msg("Shutting down processor") + p.logger.Info("Shutting down processor", slog.String("timeout", shutdownTimeout.String())) p.done.Store(true) time.AfterFunc(shutdownTimeout, func() { cancel(errors.New("shutdown timeout reached")) }) }() - p.logger.Info().Int("concurrency", concurrency).Msg("Starting processor") + p.logger.Info("Starting processor", slog.Int("concurrency", concurrency)) p.workers = make(chan struct{}, concurrency) for !p.done.Load() { // Acquire a worker before claiming a task, to avoid holding claimed tasks while all workers are busy. @@ -369,7 +369,7 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti t, err := p.client.ClaimTask(processorCtx) if err != nil { if !errors.Is(err, ErrNoTasks) && !errors.Is(err, context.Canceled) { - p.logger.Error().Err(err).Msg("Could not claim task") + p.logger.Error("Could not claim task", slog.Any("error", err)) } <-p.workers time.Sleep(1 * time.Second) @@ -378,7 +378,7 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti go func() { if err = p.processTask(processorCtx, t); err != nil { - p.logger.Error().Err(err).Msg("Could not process task") + p.logger.Error("Could not process task", slog.Any("error", err)) } <-p.workers }() diff --git a/nanoq_test.go b/nanoq_test.go index 6c7582e..4c34af7 100644 --- a/nanoq_test.go +++ b/nanoq_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "io" + "log/slog" "slices" "strings" "testing" @@ -14,7 +16,6 @@ import ( "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/oklog/ulid/v2" - "github.com/rs/zerolog" ) func Test_NewTask(t *testing.T) { @@ -151,7 +152,7 @@ func TestProcessor_Run(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) retryPolicyCalled := 0 processor.RetryPolicy(func(t nanoq.Task) time.Duration { retryPolicyCalled++ @@ -224,7 +225,7 @@ func TestProcessor_Run_RetriesExhausted(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { return errors.New("temporary error") }) @@ -284,7 +285,7 @@ func TestProcessor_Run_SkipRetry(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { return fmt.Errorf("something terrible happened: %w", nanoq.ErrSkipRetry) }) @@ -332,7 +333,7 @@ func TestProcessor_Run_Panic(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { panic(errors.New("oh no")) }) @@ -380,7 +381,7 @@ func TestProcessor_Run_NoHandler(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) errorHandlerCalled := 0 processor.OnError(func(ctx context.Context, task nanoq.Task, err error) { if !errors.Is(err, nanoq.ErrSkipRetry) || !strings.Contains(err.Error(), "no handler found for task type my-type") { @@ -428,7 +429,7 @@ func TestProcessor_Run_Middleware(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Use(func(next nanoq.Handler) nanoq.Handler { return func(ctx context.Context, t nanoq.Task) error { middlewareValue := ctx.Value(contextKey("middleware")) @@ -522,7 +523,7 @@ func TestProcessor_Run_Cancel(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { for { select { @@ -566,7 +567,7 @@ func TestProcessor_Run_Timeout(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) - processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil))) processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { for { select {