From 01f6809c6e32f98a942bb73676b14602dc4d7c99 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 7 Feb 2022 09:40:45 +0100 Subject: [PATCH] (#15) support basic lifecycle events Events emitted for task changes initially. ajc task watch updated to use these events Signed-off-by: R.I.Pienaar --- README.md | 6 +++- ajc/task_command.go | 47 ++++++++++--------------- client.go | 34 +++++++----------- client_test.go | 6 ++-- go.mod | 2 +- go.sum | 4 +-- lifecycle.go | 85 +++++++++++++++++++++++++++++++++++++++++++++ processor.go | 11 +++--- processor_test.go | 4 +-- storage.go | 40 ++++++++++++++++++--- storage_test.go | 38 ++++++++++++++++---- task.go | 5 +++ 12 files changed, 206 insertions(+), 76 deletions(-) create mode 100644 lifecycle.go diff --git a/README.md b/README.md index 8ff951a..19fb3fe 100644 --- a/README.md +++ b/README.md @@ -30,11 +30,15 @@ This package heavily inspired by [hibiken/asynq](https://github.com/hibiken/asyn ## Status -This is a brand-new project, under heavy development and relies on unreleased behaviors in JetStream. Interfaces might change, +This is a brand-new project, under heavy development. Interfaces might change, Structures might change, features might be removed if it's found to be a bad fit for the underlying storage. Use with care. +## Requirements + +NATS 2.7.2 with JetStream enabled. + ## Features This feature list is incomplete, at present the focus is on determining what will work well for the particular patterns diff --git a/ajc/task_command.go b/ajc/task_command.go index 1ad4397..41d2a42 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -74,7 +74,8 @@ func configureTaskCommand(app *kingpin.Application) { config := tasks.Command("configure", "Configures the Task storage").Alias("config").Alias("cfg").Action(c.configAction) config.Arg("retention", "Sets how long Tasks are kept in the Task Store").Required().DurationVar(&c.retention) - tasks.Command("watch", "Watch job updates in real time").Action(c.watchAction) + watch := tasks.Command("watch", "Watch job updates in real time").Action(c.watchAction) + watch.Flag("task", "Watch for updates related to a specific task ID").StringVar(&c.id) process := tasks.Command("process", "Process Tasks from a given queue").Action(c.processAction) process.Arg("type", "Types of Tasks to process").Required().Envar("AJC_TYPE").StringVar(&c.ttype) @@ -125,18 +126,17 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error { return err } - mgr, stream, err := admin.TasksStore() + mgr, _, err := admin.TasksStore() if err != nil { return err } - nc := mgr.NatsConn() - sub, err := mgr.NatsConn().SubscribeSync(nc.NewRespInbox()) - if err != nil { - return err + target := asyncjobs.EventsSubjectWildcard + if c.id != "" { + target = fmt.Sprintf(asyncjobs.TaskStateChangeEventSubjectPattern, c.id) } - _, err = stream.NewConsumer(jsm.StartWithLastReceived(), jsm.DeliverySubject(sub.Subject), jsm.AcknowledgeNone(), jsm.PushFlowControl(), jsm.IdleHeartbeat(time.Minute)) + sub, err := mgr.NatsConn().SubscribeSync(target) if err != nil { return err } @@ -147,32 +147,23 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error { return err } - if len(msg.Data) == 0 { - if msg.Reply != "" { - msg.Respond(nil) - } - - continue - } - - task := &asyncjobs.Task{} - err = json.Unmarshal(msg.Data, task) + event, kind, err := asyncjobs.ParseEventJSON(msg.Data) if err != nil { - fmt.Printf("Invalid task update received: %v: %q\n", err, msg.Data) - continue + fmt.Printf("Could not parse event: %v\n", err) } - ts := time.Now() - if task.LastTriedAt != nil && !task.LastTriedAt.IsZero() { - ts = *task.LastTriedAt - } + switch e := event.(type) { + case asyncjobs.TaskStateChangeEvent: + ts := time.Unix(0, e.TimeStamp) + if e.LastErr == "" { + fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", ts.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State) + } else { + fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", ts.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr) + } - if task.LastErr == "" { - fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", ts.Format("15:04:05"), task.ID, task.Queue, task.Type, task.Tries, task.State) - } else { - fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", ts.Format("15:04:05"), task.ID, task.Queue, task.Type, task.Tries, task.State, task.LastErr) + default: + fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind) } - } } diff --git a/client.go b/client.go index 2622891..c4bad2b 100644 --- a/client.go +++ b/client.go @@ -43,10 +43,11 @@ var ( // Storage implements the backend access type Storage interface { - SaveTaskState(ctx context.Context, task *Task) error + SaveTaskState(ctx context.Context, task *Task, notify bool) error EnqueueTask(ctx context.Context, queue *Queue, task *Task) error RetryTaskByID(ctx context.Context, queue *Queue, id string) error LoadTaskByID(id string) (*Task, error) + PublishTaskStateChangeEvent(ctx context.Context, task *Task) error AckItem(ctx context.Context, item *ProcessItem) error NakItem(ctx context.Context, item *ProcessItem) error TerminateItem(ctx context.Context, item *ProcessItem) error @@ -177,7 +178,7 @@ func (c *Client) setTaskActive(ctx context.Context, t *Task) error { t.LastTriedAt = nowPointer() t.LastErr = "" - return c.storage.SaveTaskState(ctx, t) + return c.storage.SaveTaskState(ctx, t, true) } func (c *Client) shouldDiscardTask(t *Task) bool { @@ -190,9 +191,9 @@ func (c *Client) shouldDiscardTask(t *Task) bool { return false } -func (c *Client) discardTaskIfDesired(t *Task) error { +func (c *Client) saveOrDiscardTaskIfDesired(ctx context.Context, t *Task) error { if !c.shouldDiscardTask(t) { - return nil + return c.storage.SaveTaskState(ctx, t, true) } c.log.Debugf("Discarding task with state %s based on desired discards %q", t.State, c.opts.discard) @@ -209,12 +210,7 @@ func (c *Client) setTaskSuccess(ctx context.Context, t *Task, payload interface{ CompletedAt: time.Now().UTC(), } - err := c.storage.SaveTaskState(ctx, t) - if err != nil { - return err - } - - return c.discardTaskIfDesired(t) + return c.saveOrDiscardTaskIfDesired(ctx, t) } func (c *Client) handleTaskTerminated(ctx context.Context, t *Task, terr error) error { @@ -222,12 +218,13 @@ func (c *Client) handleTaskTerminated(ctx context.Context, t *Task, terr error) t.LastTriedAt = nowPointer() t.State = TaskStateTerminated - err := c.storage.SaveTaskState(ctx, t) - if err != nil { - return err - } + return c.saveOrDiscardTaskIfDesired(ctx, t) +} + +func (c *Client) handleTaskExpired(ctx context.Context, t *Task) error { + t.State = TaskStateExpired - return c.discardTaskIfDesired(t) + return c.saveOrDiscardTaskIfDesired(ctx, t) } func (c *Client) handleTaskError(ctx context.Context, t *Task, terr error) error { @@ -242,12 +239,7 @@ func (c *Client) handleTaskError(ctx context.Context, t *Task, terr error) error } } - err := c.storage.SaveTaskState(ctx, t) - if err != nil { - return err - } - - return c.discardTaskIfDesired(t) + return c.saveOrDiscardTaskIfDesired(ctx, t) } func (c *Client) setupQueues() error { diff --git a/client_test.go b/client_test.go index 64670e9..a696d4f 100644 --- a/client_test.go +++ b/client_test.go @@ -87,7 +87,7 @@ var _ = Describe("Client", func() { }) }) - Describe("discardTaskIfDesired", func() { + Describe("saveOrDiscardTaskIfDesired", func() { It("Should delete the correct tasks", func() { withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) { client, err := NewClient(NatsConn(nc), DiscardTaskStates(TaskStateExpired, TaskStateCompleted)) @@ -97,12 +97,12 @@ var _ = Describe("Client", func() { Expect(err).ToNot(HaveOccurred()) Expect(client.EnqueueTask(context.Background(), task)).ToNot(HaveOccurred()) - Expect(client.discardTaskIfDesired(task)).ToNot(HaveOccurred()) + Expect(client.saveOrDiscardTaskIfDesired(context.Background(), task)).ToNot(HaveOccurred()) _, err = client.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) task.State = TaskStateExpired - Expect(client.discardTaskIfDesired(task)).ToNot(HaveOccurred()) + Expect(client.saveOrDiscardTaskIfDesired(context.Background(), task)).ToNot(HaveOccurred()) _, err = client.LoadTaskByID(task.ID) Expect(err).To(MatchError("task not found")) }) diff --git a/go.mod b/go.mod index 05e786a..e7da3fa 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/AlecAivazis/survey/v2 v2.3.2 github.com/dustin/go-humanize v1.0.0 github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323 - github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2 + github.com/nats-io/nats-server/v2 v2.7.2 github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d github.com/onsi/ginkgo/v2 v2.1.1 github.com/onsi/gomega v1.17.0 diff --git a/go.sum b/go.sum index d3c8b7a..012bacf 100644 --- a/go.sum +++ b/go.sum @@ -192,8 +192,8 @@ github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323/go.mod h1:HU1JmK github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.7.2-0.20220126224453-26b692ee73c0/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2 h1:/ocgZt+pxx9ocGWWdeBAJfg0tqoz4uUoIgCNepKnnDQ= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= +github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI= +github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= diff --git a/lifecycle.go b/lifecycle.go new file mode 100644 index 0000000..433b60d --- /dev/null +++ b/lifecycle.go @@ -0,0 +1,85 @@ +// Copyright (c) 2022, R.I. Pienaar and the Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package asyncjobs + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/segmentio/ksuid" +) + +type BaseEvent struct { + EventID string `json:"event_id"` + EventType string `json:"type"` + TimeStamp int64 `json:"timestamp"` +} + +type TaskStateChangeEvent struct { + TaskID string `json:"task_id"` + State TaskState `json:"state"` + Tries int `json:"tries"` + Queue string `json:"queue"` + TaskType string `json:"task_type"` + LastErr string `json:"last_error,omitempty"` + Age time.Duration `json:"task_age,omitempty"` + + BaseEvent +} + +const ( + // TaskStateChangeEventType is the event type for TaskStateChangeEvent types + TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state" +) + +func ParseEventJSON(event []byte) (interface{}, string, error) { + var base BaseEvent + err := json.Unmarshal(event, &base) + if err != nil { + return nil, "", err + } + + switch base.EventType { + case TaskStateChangeEventType: + var e TaskStateChangeEvent + err := json.Unmarshal(event, &e) + if err != nil { + return nil, "", err + } + + return e, base.EventType, nil + default: + return nil, base.EventType, fmt.Errorf("unknown event type %s", base.EventType) + } +} + +// NewTaskStateChangeEvent creates a new event notifying of a change in task state +func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error) { + eid, err := ksuid.NewRandom() + if err != nil { + return nil, err + } + + e := &TaskStateChangeEvent{ + TaskID: t.ID, + State: t.State, + Tries: t.Tries, + Queue: t.Queue, + TaskType: t.Type, + LastErr: t.LastErr, + BaseEvent: BaseEvent{ + EventID: eid.String(), + TimeStamp: eid.Time().UnixNano(), + EventType: TaskStateChangeEventType, + }, + } + + if !t.CreatedAt.IsZero() { + e.Age = time.Since(t.CreatedAt) + } + + return e, nil +} diff --git a/processor.go b/processor.go index eb234b6..1d92236 100644 --- a/processor.go +++ b/processor.go @@ -23,7 +23,8 @@ type processor struct { limiter chan struct{} retryPolicy RetryPolicyProvider log Logger - mu *sync.Mutex + + mu *sync.Mutex } // ItemKind indicates the kind of job a work queue entry represents @@ -88,15 +89,13 @@ func (p *processor) processMessage(ctx context.Context, item *ProcessItem) error return fmt.Errorf("already in state %q", task.State) } - if task.Deadline != nil && time.Since(*task.Deadline) > 0 { + if task.IsPastDeadline() { workQueueEntryPastDeadlineCounter.WithLabelValues(p.queue.Name).Inc() - task.State = TaskStateExpired - err = p.c.storage.SaveTaskState(ctx, task) + err = p.c.handleTaskExpired(ctx, task) if err != nil { - p.log.Errorf("Could not set task %s to expired state: %v", task.ID, err) + p.log.Warnf("Could not expire task %s: %v", task.ID, err) } return fmt.Errorf("past deadline") - } err = p.c.setTaskActive(ctx, task) diff --git a/processor_test.go b/processor_test.go index 097785f..663133c 100644 --- a/processor_test.go +++ b/processor_test.go @@ -246,13 +246,13 @@ var _ = Describe("Processor", func() { <-proc.limiter task.State = TaskStateCompleted - Expect(client.storage.SaveTaskState(ctx, task)).ToNot(HaveOccurred()) + Expect(client.storage.SaveTaskState(ctx, task, false)).ToNot(HaveOccurred()) err = proc.processMessage(ctx, &ProcessItem{JobID: task.ID}) Expect(err).To(MatchError("already in state \"complete\"")) <-proc.limiter task.State = TaskStateExpired - Expect(client.storage.SaveTaskState(ctx, task)).ToNot(HaveOccurred()) + Expect(client.storage.SaveTaskState(ctx, task, false)).ToNot(HaveOccurred()) err = proc.processMessage(ctx, &ProcessItem{JobID: task.ID}) Expect(err).To(MatchError("already in state \"expired\"")) diff --git a/storage.go b/storage.go index 15ec515..a194a8b 100644 --- a/storage.go +++ b/storage.go @@ -30,6 +30,13 @@ const ( // TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID TasksStreamSubjectPattern = "CHORIA_AJ.T.%s" + // EventsSubjectWildcard is the NATS wildcard for receiving all events + EventsSubjectWildcard = "CHORIA_AJ.E.>" + // TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject + TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s" + // TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages + TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*" + // WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue WorkStreamNamePattern = "CHORIA_AJ_Q_%s" // WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType @@ -87,7 +94,23 @@ func newJetStreamStorage(nc *nats.Conn, rp RetryPolicyProvider, log Logger) (*je return s, nil } -func (s *jetStreamStorage) SaveTaskState(ctx context.Context, task *Task) error { +func (s *jetStreamStorage) PublishTaskStateChangeEvent(ctx context.Context, task *Task) error { + e, err := NewTaskStateChangeEvent(task) + if err != nil { + return err + } + + ej, err := json.Marshal(e) + if err != nil { + return err + } + + target := fmt.Sprintf(TaskStateChangeEventSubjectPattern, task.ID) + s.log.Debugf("Publishing lifecycle event %s for task %s to %s", e.TaskType, task.ID, target) + return s.nc.Publish(target, ej) +} + +func (s *jetStreamStorage) SaveTaskState(ctx context.Context, task *Task, notify bool) error { jt, err := json.Marshal(task) if err != nil { return err @@ -124,6 +147,10 @@ func (s *jetStreamStorage) SaveTaskState(ctx context.Context, task *Task) error taskUpdateCounter.WithLabelValues(string(task.State)).Inc() + if notify { + return s.PublishTaskStateChangeEvent(ctx, task) + } + return nil } @@ -151,7 +178,7 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * } task.Queue = queue.Name - err = s.SaveTaskState(ctx, task) + err = s.SaveTaskState(ctx, task, true) if err != nil { return err } @@ -170,7 +197,8 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * if err != nil { enqueueErrorCounter.WithLabelValues(queue.Name).Inc() task.State = TaskStateQueueError - if err := s.SaveTaskState(ctx, task); err != nil { + task.LastErr = err.Error() + if err := s.SaveTaskState(ctx, task, true); err != nil { return err } return err @@ -180,7 +208,8 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * if err != nil { enqueueErrorCounter.WithLabelValues(queue.Name).Inc() task.State = TaskStateQueueError - if err := s.SaveTaskState(ctx, task); err != nil { + task.LastErr = err.Error() + if err := s.SaveTaskState(ctx, task, true); err != nil { return err } return err @@ -188,7 +217,8 @@ func (s *jetStreamStorage) EnqueueTask(ctx context.Context, queue *Queue, task * if ack.Duplicate { enqueueErrorCounter.WithLabelValues(queue.Name).Inc() task.State = TaskStateQueueError - if err := s.SaveTaskState(ctx, task); err != nil { + task.LastErr = err.Error() + if err := s.SaveTaskState(ctx, task, true); err != nil { return err } return fmt.Errorf("duplicate work queue item") diff --git a/storage_test.go b/storage_test.go index c692e83..0083893 100644 --- a/storage_test.go +++ b/storage_test.go @@ -703,7 +703,7 @@ var _ = Describe("Storage", func() { task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) if err != nats.ErrNoResponders { Fail(fmt.Sprintf("Unexpected error: %v", err)) } @@ -721,7 +721,7 @@ var _ = Describe("Storage", func() { task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) Expect(err).ToNot(HaveOccurred()) tasks, err := mgr.LoadStream(TasksStreamName) @@ -746,7 +746,7 @@ var _ = Describe("Storage", func() { task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) Expect(err).ToNot(HaveOccurred()) Expect(task.storageOptions).ToNot(BeNil()) so := task.storageOptions.(*taskMeta) @@ -762,7 +762,7 @@ var _ = Describe("Storage", func() { Expect(header.Get(api.JSExpectedLastSubjSeq)).To(Equal("0")) Expect(msg.Sequence).To(Equal(uint64(1))) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) Expect(err).ToNot(HaveOccurred()) Expect(task.storageOptions).ToNot(BeNil()) so = task.storageOptions.(*taskMeta) @@ -787,19 +787,43 @@ var _ = Describe("Storage", func() { task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) Expect(err).ToNot(HaveOccurred()) // force the failure task.storageOptions.(*taskMeta).seq = 10 - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) if !jsm.IsNatsError(err, 10071) { Fail(fmt.Sprintf("Unexpected error %v", err)) } }) }) + It("Should publish events", func() { + withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) { + storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) + Expect(err).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + + task, err := NewTask("ginkgo", nil) + Expect(err).ToNot(HaveOccurred()) + + sub, err := nc.SubscribeSync(fmt.Sprintf(TaskStateChangeEventSubjectPattern, task.ID)) + Expect(err).ToNot(HaveOccurred()) + + Expect(storage.SaveTaskState(ctx, task, true)).ToNot(HaveOccurred()) + + event, err := sub.NextMsg(time.Second) + Expect(err).ToNot(HaveOccurred()) + + e, kind, err := ParseEventJSON(event.Data) + Expect(err).ToNot(HaveOccurred()) + Expect(kind).To(Equal(TaskStateChangeEventType)) + Expect(e.(TaskStateChangeEvent).TaskID).To(Equal(task.ID)) + }) + }) + It("Should save the task correctly", func() { withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) @@ -811,7 +835,7 @@ var _ = Describe("Storage", func() { task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) - err = storage.SaveTaskState(ctx, task) + err = storage.SaveTaskState(ctx, task, false) Expect(err).ToNot(HaveOccurred()) lt, err := storage.LoadTaskByID(task.ID) diff --git a/task.go b/task.go index 3a82d7b..6f15002 100644 --- a/task.go +++ b/task.go @@ -110,6 +110,11 @@ func NewTask(taskType string, payload interface{}, opts ...TaskOpt) (*Task, erro return t, nil } +// IsPastDeadline determines if the task is past it's deadline +func (t *Task) IsPastDeadline() bool { + return t.Deadline != nil && time.Since(*t.Deadline) > 0 +} + // TaskOpt configures Tasks made using NewTask() type TaskOpt func(*Task) error