Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi queue backend support #57

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var (
exitOnComplete bool
isStrict bool
useNumber bool
multiQueue bool
)

// Namespace returns the namespace flag for goworker. You
Expand Down Expand Up @@ -138,6 +139,8 @@ func init() {
flag.BoolVar(&exitOnComplete, "exit-on-complete", false, "exit when the queue is empty")

flag.BoolVar(&useNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")

flag.BoolVar(&multiQueue, "multi-queue", false, "use the multi-queue failure backend")
}

func flags() error {
Expand Down
48 changes: 36 additions & 12 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"
)
Expand All @@ -12,6 +14,15 @@ type worker struct {
process
}

type stacktraceError struct {
Err error
Stacktrace []string
}

func (e *stacktraceError) Error() string {
return e.Err.Error()
}

func newWorker(id string, queues []string) (*worker, error) {
process, err := newProcess(id, queues)
if err != nil {
Expand Down Expand Up @@ -44,20 +55,25 @@ func (w *worker) start(conn *RedisConn, job *Job) error {
return w.process.start(conn)
}

func (w *worker) fail(conn *RedisConn, job *Job, err error) error {
func (w *worker) fail(conn *RedisConn, job *Job, traceErr *stacktraceError) error {
failure := &failure{
FailedAt: time.Now(),
Payload: job.Payload,
Exception: "Error",
Error: err.Error(),
Error: traceErr.Error(),
Worker: w,
Queue: job.Queue,
Backtrace: traceErr.Stacktrace,
}
buffer, err := json.Marshal(failure)
if err != nil {
return err
}
conn.Send("RPUSH", fmt.Sprintf("%sfailed", namespace), buffer)
if multiQueue {
conn.Send("RPUSH", fmt.Sprintf("%s%s_failed", namespace, job.Queue), buffer)
} else {
conn.Send("RPUSH", fmt.Sprintf("%sfailed", namespace), buffer)
}

return w.process.fail(conn)
}
Expand All @@ -69,8 +85,8 @@ func (w *worker) succeed(conn *RedisConn, job *Job) error {
return nil
}

func (w *worker) finish(conn *RedisConn, job *Job, err error) error {
if err != nil {
func (w *worker) finish(conn *RedisConn, job *Job, err *stacktraceError) error {
if err.Err != nil {
w.fail(conn, job, err)
} else {
w.succeed(conn, job)
Expand Down Expand Up @@ -117,7 +133,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.finish(conn, job, errors.New(errorLog))
stackErr := &stacktraceError{
Err: errors.New(errorLog),
}
w.finish(conn, job, stackErr)
PutConn(conn)
}
}
Expand All @@ -126,30 +145,35 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) {
}

func (w *worker) run(job *Job, workerFunc workerFunc) {
var err error
var err stacktraceError
defer func() {
conn, errCon := GetConn()
if errCon != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.finish(conn, job, err)
w.finish(conn, job, &err)
PutConn(conn)
}
}()
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
stackBuf := make([]byte, 2048)
runtime.Stack(stackBuf, false)
stack := string(stackBuf[:])
err.Err = errors.New(fmt.Sprint(r))
err.Stacktrace = strings.Split(stack, "\n")
}
}()

conn, err := GetConn()
if err != nil {
var conn *RedisConn
conn, err.Err = GetConn()
if err.Err != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.start(conn, job)
PutConn(conn)
}
err = workerFunc(job.Queue, job.Payload.Args...)
err.Err = workerFunc(job.Queue, job.Payload.Args...)
}
5 changes: 5 additions & 0 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func Enqueue(job *Job) error {
logger.Criticalf("Cant push to queue")
return err
}
err = conn.Send("SADD", fmt.Sprintf("%squeues", namespace), job.Queue)
if err != nil {
logger.Criticalf("Can't watch queue")
return err
}

return conn.Flush()
}