Skip to content

Commit

Permalink
(#31) detect stale active states
Browse files Browse the repository at this point in the history
Also set the queues to max messages per subject of 1
to simplify retries - no need to delete any old item
it will be automatic. This also sets a automatic dedup
however when enqueuing we still correctl detect
dupes and errors accordingly.

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Feb 5, 2022
1 parent 8448181 commit 77e5113
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 85 deletions.
70 changes: 35 additions & 35 deletions client_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,40 @@ func panicIfErr(err error) {
}
}

func ExampleClient_LoadTaskByID() {
client, err := NewClient(NatsContext("WQ"))
func ExampleClient_producer() {
queue := &Queue{
Name: "P100",
MaxRunTime: 60 * time.Minute,
MaxConcurrent: 20,
MaxTries: 100,
}

email := newEmail("[email protected]", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
panicIfErr(err)

task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB")
// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
panicIfErr(err)

fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
// Adds the task to the queue called P100
err = client.EnqueueTask(context.Background(), task)
panicIfErr(err)
}

func ExampleNewTask_with_deadline() {
email := newEmail("[email protected]", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
if err != nil {
panic(fmt.Sprintf("Could not create task: %v", err))
}

fmt.Printf("Task ID: %s\n", task.ID)
}

func ExampleClient_consumer() {
Expand All @@ -41,7 +67,7 @@ func ExampleClient_consumer() {

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour))
panicIfErr(err)

router := NewTaskRouter()
Expand All @@ -60,38 +86,12 @@ func ExampleClient_consumer() {
panicIfErr(err)
}

func ExampleClient_producer() {
queue := &Queue{
Name: "P100",
MaxRunTime: 60 * time.Minute,
MaxConcurrent: 20,
MaxTries: 100,
}

email := newEmail("[email protected]", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
panicIfErr(err)

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
func ExampleClient_LoadTaskByID() {
client, err := NewClient(NatsContext("WQ"))
panicIfErr(err)

// Adds the task to the queue called P100
err = client.EnqueueTask(context.Background(), task)
task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB")
panicIfErr(err)
}

func ExampleNewTask_with_deadline() {
email := newEmail("[email protected]", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
if err != nil {
panic(fmt.Sprintf("Could not create task: %v", err))
}

fmt.Printf("Task ID: %s\n", task.ID)
fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
}
4 changes: 2 additions & 2 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ClientOpts struct {
replicas int
queue *Queue
taskRetention time.Duration
retryPolicy RetryPolicy
retryPolicy RetryPolicyProvider
memoryStore bool
statsPort int
logger Logger
Expand Down Expand Up @@ -110,7 +110,7 @@ func MemoryStorage() ClientOpt {
}

// RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func RetryBackoffPolicy(p RetryPolicy) ClientOpt {
func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt {
return func(opts *ClientOpts) error {
opts.retryPolicy = p
return nil
Expand Down
9 changes: 5 additions & 4 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type processor struct {
c *Client
concurrency int
limiter chan struct{}
retryPolicy RetryPolicy
retryPolicy RetryPolicyProvider
log Logger
mu *sync.Mutex
}
Expand Down Expand Up @@ -79,8 +79,9 @@ func (p *processor) processMessage(ctx context.Context, item *ProcessItem) error

switch task.State {
case TaskStateActive:
// TODO: detect stale state
return fmt.Errorf("already active")
if task.LastTriedAt == nil || time.Since(*task.LastTriedAt) < p.queue.MaxRunTime {
return fmt.Errorf("already active")
}

case TaskStateCompleted, TaskStateExpired:
p.c.storage.AckItem(ctx, item)
Expand Down Expand Up @@ -132,7 +133,7 @@ func (p *processor) pollItem(ctx context.Context) (*ProcessItem, error) {
case err != nil:
p.log.Debugf("Unexpected polling error: %v", err)
workQueuePollErrorCounter.WithLabelValues(p.queue.Name).Inc()
if retryLinearTenSeconds.Sleep(ctx, ctr) == context.Canceled {
if RetrySleep(ctx, retryLinearTenSeconds, ctr) == context.Canceled {
return nil, ctx.Err()
}
ctr++
Expand Down
28 changes: 28 additions & 0 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,34 @@ var _ = Describe("Processor", func() {
})
})

It("Should detect stale active tasks", func() {
withJetStream(func(nc *nats.Conn, _ *jsm.Manager) {
q := newDefaultQueue()
q.MaxRunTime = 100 * time.Millisecond

client, err := NewClient(NatsConn(nc), WorkQueue(q))
Expect(err).ToNot(HaveOccurred())

Expect(client.setupStreams()).ToNot(HaveOccurred())
Expect(client.setupQueues()).ToNot(HaveOccurred())

task, err := NewTask("ginkgo", "test")
Expect(err).ToNot(HaveOccurred())
Expect(client.EnqueueTask(ctx, task)).ToNot(HaveOccurred())
Expect(client.setTaskActive(ctx, task)).ToNot(HaveOccurred())

proc, err := newProcessor(client)
Expect(err).ToNot(HaveOccurred())

// sleep till past queue max age
time.Sleep(200 * time.Millisecond)

<-proc.limiter

Expect(proc.processMessage(ctx, &ProcessItem{JobID: task.ID})).ToNot(HaveOccurred())
})
})

It("Should not process completed or expired tasks", func() {
withJetStream(func(nc *nats.Conn, _ *jsm.Manager) {
client, err := NewClient(NatsConn(nc))
Expand Down
27 changes: 17 additions & 10 deletions retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type RetryPolicy struct {
Jitter float64
}

// RetryPolicyProvider is the interface that the ReplyPolicy implements,
// use this to implement your own exponential backoff system or similar for
// task retries.
type RetryPolicyProvider interface {
Duration(n int) time.Duration
}

var (
// RetryLinearTenMinutes is a 20-step policy between 1 and 10 minutes
RetryLinearTenMinutes = linearPolicy(20, 0.90, time.Minute, 10*time.Minute)
Expand All @@ -36,22 +43,22 @@ var (
)

// Duration is the period to sleep for try n, it includes a jitter
func (b RetryPolicy) Duration(n int) time.Duration {
if n >= len(b.Intervals) {
n = len(b.Intervals) - 1
func (p RetryPolicy) Duration(n int) time.Duration {
if n >= len(p.Intervals) {
n = len(p.Intervals) - 1
}

delay := b.jitter(b.Intervals[n])
delay := p.jitter(p.Intervals[n])
if delay == 0 {
delay = b.Intervals[0]
delay = p.Intervals[0]
}

return delay
}

// Sleep attempts to sleep for the relevant duration for n, interruptable by ctx
func (b RetryPolicy) Sleep(ctx context.Context, n int) error {
timer := time.NewTimer(b.Duration(n))
// RetrySleep sleeps for the duration for try n or until interrupted by ctx
func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error {
timer := time.NewTimer(p.Duration(n))

select {
case <-timer.C:
Expand Down Expand Up @@ -80,12 +87,12 @@ func linearPolicy(steps uint64, jitter float64, min time.Duration, max time.Dura
return p
}

func (b RetryPolicy) jitter(d time.Duration) time.Duration {
func (p RetryPolicy) jitter(d time.Duration) time.Duration {
if d == 0 {
return 0
}

jf := (float64(d) * b.Jitter) + float64(rand.Int63n(int64(d)))
jf := (float64(d) * p.Jitter) + float64(rand.Int63n(int64(d)))

return time.Duration(jf).Round(time.Millisecond)
}
19 changes: 3 additions & 16 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type jetStreamStorage struct {
mgr *jsm.Manager

tasks *taskStorage
retry RetryPolicy
retry RetryPolicyProvider

qStreams map[string]*jsm.Stream
qConsumers map[string]*jsm.Consumer
Expand All @@ -64,7 +64,7 @@ type taskMeta struct {
seq uint64
}

func newJetStreamStorage(nc *nats.Conn, rp RetryPolicy, log Logger) (*jetStreamStorage, error) {
func newJetStreamStorage(nc *nats.Conn, rp RetryPolicyProvider, log Logger) (*jetStreamStorage, error) {
if nc == nil {
return nil, fmt.Errorf("no connection supplied")
}
Expand Down Expand Up @@ -134,20 +134,6 @@ func (s *jetStreamStorage) RetryTaskByID(ctx context.Context, queue *Queue, id s
return err
}

q, ok := s.qStreams[queue.Name]
if !ok {
return fmt.Errorf("unknown queue %s", queue.Name)
}

msg, err := q.ReadLastMessageForSubject(fmt.Sprintf(WorkStreamSubjectPattern, queue.Name, task.ID))
if err == nil {
s.log.Debugf("Deleting Work Queue item %d from %s", msg.Sequence, msg.Subject)
err = q.DeleteMessage(msg.Sequence)
if err != nil {
s.log.Warnf("Could not remove work queue item for Task %s from Queue %s", task.ID, queue.Name)
}
}

task.State = TaskStateRetry
task.Result = nil

Expand Down Expand Up @@ -323,6 +309,7 @@ func (s *jetStreamStorage) createQueue(q *Queue, replicas int, memory bool) erro
jsm.Subjects(fmt.Sprintf(WorkStreamSubjectPattern, q.Name, ">")),
jsm.WorkQueueRetention(),
jsm.Replicas(replicas),
jsm.MaxMessagesPerSubject(1),
jsm.StreamDescription("Choria Async Jobs Work Queue"),
}

Expand Down
18 changes: 0 additions & 18 deletions storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,24 +559,6 @@ var _ = Describe("Storage", func() {
})
})

It("Should handle unknown queues", func() {
withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) {
storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{})
Expect(err).ToNot(HaveOccurred())

q := testQueue()
Expect(storage.PrepareQueue(q, 1, true)).ToNot(HaveOccurred())
Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred())

task, err := NewTask("ginkgo", nil)
Expect(err).ToNot(HaveOccurred())
Expect(storage.EnqueueTask(context.Background(), q, task)).ToNot(HaveOccurred())

delete(storage.qStreams, q.Name)
Expect(storage.RetryTaskByID(context.Background(), q, task.ID)).To(MatchError("unknown queue ginkgo"))
})
})

It("Should remove the task from the queue and enqueue with retry state", func() {
withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) {
storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{})
Expand Down

0 comments on commit 77e5113

Please sign in to comment.