Skip to content

Commit

Permalink
Merge pull request #33 from ripienaar/15
Browse files Browse the repository at this point in the history
(#15) support basic lifecycle events
  • Loading branch information
ripienaar authored Feb 7, 2022
2 parents 0b1b94b + 01f6809 commit 90b4611
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 76 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ This package heavily inspired by [hibiken/asynq](https://github.com/hibiken/asyn

## Status

This is a brand-new project, under heavy development and relies on unreleased behaviors in JetStream. Interfaces might change,
This is a brand-new project, under heavy development. Interfaces might change,
Structures might change, features might be removed if it's found to be a bad fit for the underlying storage.

Use with care.

## Requirements

NATS 2.7.2 with JetStream enabled.

## Features

This feature list is incomplete, at present the focus is on determining what will work well for the particular patterns
Expand Down
47 changes: 19 additions & 28 deletions ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func configureTaskCommand(app *kingpin.Application) {
config := tasks.Command("configure", "Configures the Task storage").Alias("config").Alias("cfg").Action(c.configAction)
config.Arg("retention", "Sets how long Tasks are kept in the Task Store").Required().DurationVar(&c.retention)

tasks.Command("watch", "Watch job updates in real time").Action(c.watchAction)
watch := tasks.Command("watch", "Watch job updates in real time").Action(c.watchAction)
watch.Flag("task", "Watch for updates related to a specific task ID").StringVar(&c.id)

process := tasks.Command("process", "Process Tasks from a given queue").Action(c.processAction)
process.Arg("type", "Types of Tasks to process").Required().Envar("AJC_TYPE").StringVar(&c.ttype)
Expand Down Expand Up @@ -125,18 +126,17 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
return err
}

mgr, stream, err := admin.TasksStore()
mgr, _, err := admin.TasksStore()
if err != nil {
return err
}

nc := mgr.NatsConn()
sub, err := mgr.NatsConn().SubscribeSync(nc.NewRespInbox())
if err != nil {
return err
target := asyncjobs.EventsSubjectWildcard
if c.id != "" {
target = fmt.Sprintf(asyncjobs.TaskStateChangeEventSubjectPattern, c.id)
}

_, err = stream.NewConsumer(jsm.StartWithLastReceived(), jsm.DeliverySubject(sub.Subject), jsm.AcknowledgeNone(), jsm.PushFlowControl(), jsm.IdleHeartbeat(time.Minute))
sub, err := mgr.NatsConn().SubscribeSync(target)
if err != nil {
return err
}
Expand All @@ -147,32 +147,23 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
return err
}

if len(msg.Data) == 0 {
if msg.Reply != "" {
msg.Respond(nil)
}

continue
}

task := &asyncjobs.Task{}
err = json.Unmarshal(msg.Data, task)
event, kind, err := asyncjobs.ParseEventJSON(msg.Data)
if err != nil {
fmt.Printf("Invalid task update received: %v: %q\n", err, msg.Data)
continue
fmt.Printf("Could not parse event: %v\n", err)
}

ts := time.Now()
if task.LastTriedAt != nil && !task.LastTriedAt.IsZero() {
ts = *task.LastTriedAt
}
switch e := event.(type) {
case asyncjobs.TaskStateChangeEvent:
ts := time.Unix(0, e.TimeStamp)
if e.LastErr == "" {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", ts.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State)
} else {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", ts.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr)
}

if task.LastErr == "" {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", ts.Format("15:04:05"), task.ID, task.Queue, task.Type, task.Tries, task.State)
} else {
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", ts.Format("15:04:05"), task.ID, task.Queue, task.Type, task.Tries, task.State, task.LastErr)
default:
fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind)
}

}
}

Expand Down
34 changes: 13 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ var (

// Storage implements the backend access
type Storage interface {
SaveTaskState(ctx context.Context, task *Task) error
SaveTaskState(ctx context.Context, task *Task, notify bool) error
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
RetryTaskByID(ctx context.Context, queue *Queue, id string) error
LoadTaskByID(id string) (*Task, error)
PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
AckItem(ctx context.Context, item *ProcessItem) error
NakItem(ctx context.Context, item *ProcessItem) error
TerminateItem(ctx context.Context, item *ProcessItem) error
Expand Down Expand Up @@ -177,7 +178,7 @@ func (c *Client) setTaskActive(ctx context.Context, t *Task) error {
t.LastTriedAt = nowPointer()
t.LastErr = ""

return c.storage.SaveTaskState(ctx, t)
return c.storage.SaveTaskState(ctx, t, true)
}

func (c *Client) shouldDiscardTask(t *Task) bool {
Expand All @@ -190,9 +191,9 @@ func (c *Client) shouldDiscardTask(t *Task) bool {
return false
}

func (c *Client) discardTaskIfDesired(t *Task) error {
func (c *Client) saveOrDiscardTaskIfDesired(ctx context.Context, t *Task) error {
if !c.shouldDiscardTask(t) {
return nil
return c.storage.SaveTaskState(ctx, t, true)
}

c.log.Debugf("Discarding task with state %s based on desired discards %q", t.State, c.opts.discard)
Expand All @@ -209,25 +210,21 @@ func (c *Client) setTaskSuccess(ctx context.Context, t *Task, payload interface{
CompletedAt: time.Now().UTC(),
}

err := c.storage.SaveTaskState(ctx, t)
if err != nil {
return err
}

return c.discardTaskIfDesired(t)
return c.saveOrDiscardTaskIfDesired(ctx, t)
}

func (c *Client) handleTaskTerminated(ctx context.Context, t *Task, terr error) error {
t.LastErr = terr.Error()
t.LastTriedAt = nowPointer()
t.State = TaskStateTerminated

err := c.storage.SaveTaskState(ctx, t)
if err != nil {
return err
}
return c.saveOrDiscardTaskIfDesired(ctx, t)
}

func (c *Client) handleTaskExpired(ctx context.Context, t *Task) error {
t.State = TaskStateExpired

return c.discardTaskIfDesired(t)
return c.saveOrDiscardTaskIfDesired(ctx, t)
}

func (c *Client) handleTaskError(ctx context.Context, t *Task, terr error) error {
Expand All @@ -242,12 +239,7 @@ func (c *Client) handleTaskError(ctx context.Context, t *Task, terr error) error
}
}

err := c.storage.SaveTaskState(ctx, t)
if err != nil {
return err
}

return c.discardTaskIfDesired(t)
return c.saveOrDiscardTaskIfDesired(ctx, t)
}

func (c *Client) setupQueues() error {
Expand Down
6 changes: 3 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("Client", func() {
})
})

Describe("discardTaskIfDesired", func() {
Describe("saveOrDiscardTaskIfDesired", func() {
It("Should delete the correct tasks", func() {
withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) {
client, err := NewClient(NatsConn(nc), DiscardTaskStates(TaskStateExpired, TaskStateCompleted))
Expand All @@ -97,12 +97,12 @@ var _ = Describe("Client", func() {
Expect(err).ToNot(HaveOccurred())
Expect(client.EnqueueTask(context.Background(), task)).ToNot(HaveOccurred())

Expect(client.discardTaskIfDesired(task)).ToNot(HaveOccurred())
Expect(client.saveOrDiscardTaskIfDesired(context.Background(), task)).ToNot(HaveOccurred())
_, err = client.LoadTaskByID(task.ID)
Expect(err).ToNot(HaveOccurred())

task.State = TaskStateExpired
Expect(client.discardTaskIfDesired(task)).ToNot(HaveOccurred())
Expect(client.saveOrDiscardTaskIfDesired(context.Background(), task)).ToNot(HaveOccurred())
_, err = client.LoadTaskByID(task.ID)
Expect(err).To(MatchError("task not found"))
})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/AlecAivazis/survey/v2 v2.3.2
github.com/dustin/go-humanize v1.0.0
github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/onsi/ginkgo/v2 v2.1.1
github.com/onsi/gomega v1.17.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323/go.mod h1:HU1JmK
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220126224453-26b692ee73c0/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2 h1:/ocgZt+pxx9ocGWWdeBAJfg0tqoz4uUoIgCNepKnnDQ=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
Expand Down
85 changes: 85 additions & 0 deletions lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package asyncjobs

import (
"encoding/json"
"fmt"
"time"

"github.com/segmentio/ksuid"
)

type BaseEvent struct {
EventID string `json:"event_id"`
EventType string `json:"type"`
TimeStamp int64 `json:"timestamp"`
}

type TaskStateChangeEvent struct {
TaskID string `json:"task_id"`
State TaskState `json:"state"`
Tries int `json:"tries"`
Queue string `json:"queue"`
TaskType string `json:"task_type"`
LastErr string `json:"last_error,omitempty"`
Age time.Duration `json:"task_age,omitempty"`

BaseEvent
}

const (
// TaskStateChangeEventType is the event type for TaskStateChangeEvent types
TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state"
)

func ParseEventJSON(event []byte) (interface{}, string, error) {
var base BaseEvent
err := json.Unmarshal(event, &base)
if err != nil {
return nil, "", err
}

switch base.EventType {
case TaskStateChangeEventType:
var e TaskStateChangeEvent
err := json.Unmarshal(event, &e)
if err != nil {
return nil, "", err
}

return e, base.EventType, nil
default:
return nil, base.EventType, fmt.Errorf("unknown event type %s", base.EventType)
}
}

// NewTaskStateChangeEvent creates a new event notifying of a change in task state
func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error) {
eid, err := ksuid.NewRandom()
if err != nil {
return nil, err
}

e := &TaskStateChangeEvent{
TaskID: t.ID,
State: t.State,
Tries: t.Tries,
Queue: t.Queue,
TaskType: t.Type,
LastErr: t.LastErr,
BaseEvent: BaseEvent{
EventID: eid.String(),
TimeStamp: eid.Time().UnixNano(),
EventType: TaskStateChangeEventType,
},
}

if !t.CreatedAt.IsZero() {
e.Age = time.Since(t.CreatedAt)
}

return e, nil
}
11 changes: 5 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type processor struct {
limiter chan struct{}
retryPolicy RetryPolicyProvider
log Logger
mu *sync.Mutex

mu *sync.Mutex
}

// ItemKind indicates the kind of job a work queue entry represents
Expand Down Expand Up @@ -88,15 +89,13 @@ func (p *processor) processMessage(ctx context.Context, item *ProcessItem) error
return fmt.Errorf("already in state %q", task.State)
}

if task.Deadline != nil && time.Since(*task.Deadline) > 0 {
if task.IsPastDeadline() {
workQueueEntryPastDeadlineCounter.WithLabelValues(p.queue.Name).Inc()
task.State = TaskStateExpired
err = p.c.storage.SaveTaskState(ctx, task)
err = p.c.handleTaskExpired(ctx, task)
if err != nil {
p.log.Errorf("Could not set task %s to expired state: %v", task.ID, err)
p.log.Warnf("Could not expire task %s: %v", task.ID, err)
}
return fmt.Errorf("past deadline")

}

err = p.c.setTaskActive(ctx, task)
Expand Down
4 changes: 2 additions & 2 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ var _ = Describe("Processor", func() {

<-proc.limiter
task.State = TaskStateCompleted
Expect(client.storage.SaveTaskState(ctx, task)).ToNot(HaveOccurred())
Expect(client.storage.SaveTaskState(ctx, task, false)).ToNot(HaveOccurred())
err = proc.processMessage(ctx, &ProcessItem{JobID: task.ID})
Expect(err).To(MatchError("already in state \"complete\""))

<-proc.limiter
task.State = TaskStateExpired
Expect(client.storage.SaveTaskState(ctx, task)).ToNot(HaveOccurred())
Expect(client.storage.SaveTaskState(ctx, task, false)).ToNot(HaveOccurred())
err = proc.processMessage(ctx, &ProcessItem{JobID: task.ID})
Expect(err).To(MatchError("already in state \"expired\""))

Expand Down
Loading

0 comments on commit 90b4611

Please sign in to comment.