Skip to content

Commit

Permalink
Replace zerolog with slog.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Jun 4, 2024
1 parent 688360a commit 628c035
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 49 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type recalculateStockPayload struct {
}

// This could also be a method on a Handler struct containing dependencies.
func RecalculateStock(logger zerolog.Logger) nanoq.Handler {
func RecalculateStock(logger *slog.Logger) nanoq.Handler {
return func(ctx context.Context, t nanoq.Task) error {
var payload recalculateStockPayload
if err := json.Unmarshal(t.Payload, &payload); err != nil {
Expand All @@ -39,10 +39,10 @@ func RecalculateStock(logger zerolog.Logger) nanoq.Handler {

// Do your thing here.

logger.Info().
Str("task_type", "recalculate-stock").
Str("product_id", payload.ProductID).
Msg("Task completed")
logger.Info("Task completed",
slog.String("task_type", "recalculate-stock"),
slog.String("product_id", payload.ProductID),
)

return nil
}
Expand All @@ -67,7 +67,7 @@ if err := queueClient.CreateTask(ctx, tx, t); err != nanoq.ErrDuplicateTask {

Finally, initialize the processor:
```go
// logger is assumed to be a zerolog instance.
// logger is an existing *slog.Logger.
processor := nanoq.NewProcessor(nanoq.NewClient(db), logger)

// The default retry policy uses an exponential backoff with jitter,
Expand All @@ -82,10 +82,10 @@ processor.RetryPolicy(func (t nanoq.Task) {
processor.OnError(func(ctx context.Context, t nanoq.Task, err error) {
// Log each failed task.
// Idea: Send to Sentry when t.Retries == t.MaxRetries.
logger.Error().
Str("task_type", t.Type).
Str("attempt", fmt.Sprintf("%v/%v", t.Retries, t.MaxRetries)).
Msg(err.Error())
logger.Error(err.Error(),
slog.String("task_type", t.Type),
slog.String("attempt", fmt.Sprintf("%v/%v", t.Retries, t.MaxRetries)),
)
})
processor.Handle("recalculate-stock", RecalculateStock(logger))

Expand Down
9 changes: 1 addition & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,8 @@ module github.com/bojanz/nanoq
go 1.22.0

require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/go-sql-driver/mysql v1.7.1
github.com/jmoiron/sqlx v1.3.5
github.com/oklog/ulid/v2 v2.1.0
github.com/rs/zerolog v1.32.0
)

require (
github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
golang.org/x/sys v0.12.0 // indirect
)
15 changes: 0 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
14 changes: 7 additions & 7 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"log/slog"
"math"
"math/rand/v2"
"os"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/oklog/ulid/v2"
"github.com/rs/zerolog"
)

var (
Expand Down Expand Up @@ -291,7 +291,7 @@ func DefaultRetryPolicy(t Task) time.Duration {
// Processor represents the queue processor.
type Processor struct {
client *Client
logger zerolog.Logger
logger *slog.Logger

errorHandler ErrorHandler
handlers map[string]Handler
Expand All @@ -303,7 +303,7 @@ type Processor struct {
}

// NewProcessor creates a new processor.
func NewProcessor(client *Client, logger zerolog.Logger) *Processor {
func NewProcessor(client *Client, logger *slog.Logger) *Processor {
return &Processor{
client: client,
logger: logger,
Expand Down Expand Up @@ -353,14 +353,14 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti
case <-ctx.Done():
}

p.logger.Info().Str("timeout", shutdownTimeout.String()).Msg("Shutting down processor")
p.logger.Info("Shutting down processor", slog.String("timeout", shutdownTimeout.String()))
p.done.Store(true)
time.AfterFunc(shutdownTimeout, func() {
cancel(errors.New("shutdown timeout reached"))
})
}()

p.logger.Info().Int("concurrency", concurrency).Msg("Starting processor")
p.logger.Info("Starting processor", slog.Int("concurrency", concurrency))
p.workers = make(chan struct{}, concurrency)
for !p.done.Load() {
// Acquire a worker before claiming a task, to avoid holding claimed tasks while all workers are busy.
Expand All @@ -369,7 +369,7 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti
t, err := p.client.ClaimTask(processorCtx)
if err != nil {
if !errors.Is(err, ErrNoTasks) && !errors.Is(err, context.Canceled) {
p.logger.Error().Err(err).Msg("Could not claim task")
p.logger.Error("Could not claim task", slog.Any("error", err))
}
<-p.workers
time.Sleep(1 * time.Second)
Expand All @@ -378,7 +378,7 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti

go func() {
if err = p.processTask(processorCtx, t); err != nil {
p.logger.Error().Err(err).Msg("Could not process task")
p.logger.Error("Could not process task", slog.Any("error", err))
}
<-p.workers
}()
Expand Down
19 changes: 10 additions & 9 deletions nanoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"slices"
"strings"
"testing"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/oklog/ulid/v2"
"github.com/rs/zerolog"
)

func Test_NewTask(t *testing.T) {
Expand Down Expand Up @@ -151,7 +152,7 @@ func TestProcessor_Run(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
retryPolicyCalled := 0
processor.RetryPolicy(func(t nanoq.Task) time.Duration {
retryPolicyCalled++
Expand Down Expand Up @@ -224,7 +225,7 @@ func TestProcessor_Run_RetriesExhausted(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
return errors.New("temporary error")
})
Expand Down Expand Up @@ -284,7 +285,7 @@ func TestProcessor_Run_SkipRetry(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
return fmt.Errorf("something terrible happened: %w", nanoq.ErrSkipRetry)
})
Expand Down Expand Up @@ -332,7 +333,7 @@ func TestProcessor_Run_Panic(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
panic(errors.New("oh no"))
})
Expand Down Expand Up @@ -380,7 +381,7 @@ func TestProcessor_Run_NoHandler(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
if !errors.Is(err, nanoq.ErrSkipRetry) || !strings.Contains(err.Error(), "no handler found for task type my-type") {
Expand Down Expand Up @@ -428,7 +429,7 @@ func TestProcessor_Run_Middleware(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Use(func(next nanoq.Handler) nanoq.Handler {
return func(ctx context.Context, t nanoq.Task) error {
middlewareValue := ctx.Value(contextKey("middleware"))
Expand Down Expand Up @@ -522,7 +523,7 @@ func TestProcessor_Run_Cancel(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
for {
select {
Expand Down Expand Up @@ -566,7 +567,7 @@ func TestProcessor_Run_Timeout(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor := nanoq.NewProcessor(client, slog.New(slog.NewTextHandler(io.Discard, nil)))
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
for {
select {
Expand Down

0 comments on commit 628c035

Please sign in to comment.