-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathlifecycle.go
135 lines (114 loc) · 3.48 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package asyncjobs
import (
"encoding/json"
"fmt"
"time"
"github.com/segmentio/ksuid"
)
// BaseEvent is present in all event types and can be used to detect the type
type BaseEvent struct {
EventID string `json:"event_id"`
EventType string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
}
// TaskStateChangeEvent notifies that a significant change occurred in a Task
type TaskStateChangeEvent struct {
BaseEvent
// TaskID is the ID of the task, use with LoadTaskByID() to access the task
TaskID string `json:"task_id"`
// State is the new state of the Task
State TaskState `json:"state"`
// Tries is how many times the Task has been processed
Tries int `json:"tries"`
// Queue is the queue the task is in, can be empty
Queue string `json:"queue,omitempty"`
// TaskType is the task routing type
TaskType string `json:"task_type"`
// LstErr is the error that caused a task to change state for error state changes
LastErr string `json:"last_error,omitempty"`
// Age is the time since the task was created in milliseconds
Age time.Duration `json:"task_age,omitempty"`
}
// LeaderElectedEvent notifies that a leader election was won
type LeaderElectedEvent struct {
BaseEvent
// Name of the process that gained leadership
Name string `json:"name"`
// Component is the component that is reporting
Component string `json:"component"`
}
const (
// TaskStateChangeEventType is the event type for TaskStateChangeEvent events
TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state"
// LeaderElectedEventType is the event type for LeaderElectedEvent events
LeaderElectedEventType = "io.choria.asyncjobs.v1.leader_elected"
)
// ParseEventJSON parses event bytes returning the parsed Event and its event type
func ParseEventJSON(event []byte) (any, string, error) {
var base BaseEvent
err := json.Unmarshal(event, &base)
if err != nil {
return nil, "", err
}
switch base.EventType {
case TaskStateChangeEventType:
var e TaskStateChangeEvent
err := json.Unmarshal(event, &e)
if err != nil {
return nil, "", err
}
return e, base.EventType, nil
case LeaderElectedEventType:
var e LeaderElectedEvent
err := json.Unmarshal(event, &e)
if err != nil {
return nil, "", err
}
return e, base.EventType, nil
default:
return nil, base.EventType, fmt.Errorf("%w: %s", ErrUnknownEventType, base.EventType)
}
}
// NewLeaderElectedEvent creates a new event notifying of a leader election win
func NewLeaderElectedEvent(name string, component string) (*LeaderElectedEvent, error) {
eid, err := ksuid.NewRandom()
if err != nil {
return nil, err
}
return &LeaderElectedEvent{
Name: name,
Component: component,
BaseEvent: BaseEvent{
EventID: eid.String(),
TimeStamp: eid.Time().UTC(),
EventType: LeaderElectedEventType,
},
}, nil
}
// NewTaskStateChangeEvent creates a new event notifying of a change in task state
func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error) {
eid, err := ksuid.NewRandom()
if err != nil {
return nil, err
}
e := &TaskStateChangeEvent{
TaskID: t.ID,
State: t.State,
Tries: t.Tries,
Queue: t.Queue,
TaskType: t.Type,
LastErr: t.LastErr,
BaseEvent: BaseEvent{
EventID: eid.String(),
TimeStamp: eid.Time().UTC(),
EventType: TaskStateChangeEventType,
},
}
if !t.CreatedAt.IsZero() {
e.Age = time.Since(t.CreatedAt.Round(time.Millisecond))
}
return e, nil
}