Skip to content

Commit

Permalink
Add retry_delay ability to flowlord (#153)
Browse files Browse the repository at this point in the history
* Add retry_delay ability to flowlord
- fix issues with hot loading workflows

* sqlload: prevent failure on zero data
  • Loading branch information
jbsmith7741 authored Mar 5, 2021
1 parent 5b9f125 commit b9a5a34
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 24 deletions.
17 changes: 9 additions & 8 deletions apps/taskmasters/flowlord/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@ template = "?date={yyyy}-{mm}-{dd}"
[[Phase]]
task = "task1"
dependsOn = "topic_name"
rule = ""
rule = "retry_delay=30s"
retry = 3
template = "?hour={yyyy}-{mm}-{dd}T{hh}"
```

### Phase

- task: the name of the topic this task will be sent to. It is also the unique name of the task. In Addition a job of a task can be added in the task name using a colon (:) as a separator (task:job)
- dependsOn:
- **task**: the name of the topic this task will be sent to. It is also the unique name of the task. In Addition a job of a task can be added in the task name using a colon (:) as a separator (task:job)
- **dependsOn**:
- the name of the parent task
- this task will start after the parent task has completed successfully
- if left blank this tasks will only start based on the rule
- rule: rules on about the tasks that are encoded as query params
- cron: schedule the task based on the cron pattern (see scheduling)
- require: used in a child task saying to only start task if value is present
- retry: the number of times a task is retried before being sent to failed_tasks
- template: a URL string that is parsed and put into the task's info string when created
- **rule**: rules on about the tasks that are encoded as query params
- _cron_: schedule the task based on the cron pattern (see scheduling)
- _require_: used in a child task saying to only start task if value is present
- _retry_delay_: duration to wait before retrying the task
- **retry**: the number of times a task is retried before being sent to failed_tasks
- **template**: a URL string that is parsed and put into the task's info string when created

### Template
templating is used to create dynamic tasks based on the time run or previous jobs run. templates are designated with surrounding brackets `{}`
Expand Down
47 changes: 43 additions & 4 deletions apps/taskmasters/flowlord/taskmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/url"
"regexp"
"strconv"
Expand All @@ -24,6 +25,10 @@ import (
"github.com/pcelvng/task-tools/workflow"
)

func init() {
rand.Seed(time.Now().Unix())
}

type taskMaster struct {
initTime time.Time
nextUpdate time.Time
Expand Down Expand Up @@ -224,9 +229,11 @@ func (tm *taskMaster) schedule() (err error) {
// Send retry failed tasks to tm.failedTopic (only if the phase exists in the workflow)
func (tm *taskMaster) Process(t *task.Task) error {
meta, _ := url.ParseQuery(t.Meta)
// attempt to return

// attempt to retry
if t.Result == task.ErrResult {
p := tm.Get(*t)
rules, _ := url.ParseQuery(p.Rule)

r := meta.Get("retry")
i, _ := strconv.Atoi(r)
Expand All @@ -235,13 +242,23 @@ func (tm *taskMaster) Process(t *task.Task) error {
return nil
}
if p.Retry > i {
var delay time.Duration
if s := rules.Get("retry_delay"); s != "" {
delay, _ = time.ParseDuration(s)
delay = delay + jitterPercent(delay, 40)
meta.Set("delayed", gtools.PrintDuration(delay))
}
t = task.NewWithID(t.Type, t.Info, t.ID)
i++
meta.Set("retry", strconv.Itoa(i))
t.Meta = meta.Encode()
if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil {
return err
}
go func() {
time.Sleep(delay)
if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil {
log.Println(err)
}
}()
return nil
} else if tm.failedTopic != "-" {
// send to the retry failed topic if retries > p.Retry
meta.Set("retry", "failed")
Expand Down Expand Up @@ -321,3 +338,25 @@ func (tm *taskMaster) read(ctx context.Context) {
}
}
}

// jitterPercent will return a time.Duration representing extra
// 'jitter' to be added to the wait time. Jitter is important
// in retry events since the original cause of failure can be
// due to too many jobs being processed at a time.
//
// By adding some jitter the retry events won't all happen
// at once but will get staggered to prevent the problem
// from happening again.
//
// 'p' is a percentage of the wait time. Duration returned
// is a random duration between 0 and p. 'p' should be a value
// between 0-100.
func jitterPercent(wait time.Duration, p int64) time.Duration {
// p == 40
if wait == 0 {
return 0
}
maxJitter := (int64(wait) * p) / 100

return time.Duration(rand.Int63n(maxJitter))
}
12 changes: 10 additions & 2 deletions apps/taskmasters/flowlord/taskmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package main

import (
"encoding/json"
"regexp"
"sort"
"strings"
"testing"
"time"

"github.com/hydronica/trial"
"github.com/pcelvng/task"
Expand All @@ -13,6 +16,7 @@ import (
)

func TestTaskMaster_Process(t *testing.T) {
delayRegex := regexp.MustCompile(`delayed=(\d+.\d+)`)
cache, err := workflow.New("../../../internal/test/workflow/f1.toml", nil)
if err != nil {
t.Fatal("cache init", err)
Expand All @@ -31,6 +35,7 @@ func TestTaskMaster_Process(t *testing.T) {
tm.producer = producer
nop.FakeMsg = tsk.JSONBytes()
err = tm.Process(&tsk)
time.Sleep(100 * time.Millisecond)
result := make([]task.Task, 0)
for _, msgs := range producer.Messages {
for _, msg := range msgs {
Expand All @@ -39,6 +44,9 @@ func TestTaskMaster_Process(t *testing.T) {
return nil, err
}
v.Created = ""
if s := delayRegex.FindStringSubmatch(v.Meta); len(s) > 1 {
v.Meta = strings.Replace(v.Meta, s[1], "XX", 1)
}
result = append(result, v)
}
}
Expand All @@ -62,7 +70,7 @@ func TestTaskMaster_Process(t *testing.T) {
Type: "task1",
Info: "?date=2019-12-12",
ID: "UUID_task1_attempt0",
Meta: "retry=1&workflow=f1.toml"},
Meta: "delayed=XXms&retry=1&workflow=f1.toml"},
},
},
"task1 attempt 2": {
Expand All @@ -77,7 +85,7 @@ func TestTaskMaster_Process(t *testing.T) {
Type: "task1",
Info: "?date=2019-12-12",
ID: "UUID_task1_attempt2",
Meta: "retry=3&workflow=f1.toml"},
Meta: "delayed=XXms&retry=3&workflow=f1.toml"},
},
},
"task1 no retry": {
Expand Down
10 changes: 7 additions & 3 deletions apps/workers/sql-load/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
queryChan := make(chan string, 10)
go CreateInserts(rowChan, queryChan, tempTable, w.ds.insertCols, w.Params.BatchSize)

first := true
tableCreated := false
// load data into temp table
for s := range queryChan {
if first {
if !tableCreated {
s = createTempTable + s
first = false
tableCreated = true
}
if _, err := w.sqlDB.ExecContext(ctx, s); err != nil {
cancelFn()
Expand All @@ -149,6 +149,10 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
return task.Failed(w.ds.err)
}

if !tableCreated {
return task.Completed("no data to load for %s", w.Params.Table)
}

//finalize and transfer data
var txErr error
var tx *sql.Tx
Expand Down
4 changes: 2 additions & 2 deletions internal/test/workflow/f1.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[[Phase]]
task = "task1"
rule = "cron=0 * * * *&offset=-4h&job=t2"
rule = "cron=0 * * * *&offset=-4h&job=t2&retry_delay=10ms"
retry = 3
template = "?date={yyyy}-{mm}-{dd}T{hh}"

Expand Down Expand Up @@ -33,7 +33,7 @@ template = "?date={yyyy}-{mm}-{dd}T{hh}"

[[Phase]]
task = "task5"
rule = "job=t5"
rule = "job=t5&retry_delay=10ms"
retry = 3
template = "?year={yyyy}"
DependsOn = "task1:t2"
Expand Down
2 changes: 1 addition & 1 deletion retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (r *Retryer) close() error {
return nil
}

// genJitter will return a time.Duration representing extra
// jitterPercent will return a time.Duration representing extra
// 'jitter' to be added to the wait time. Jitter is important
// in retry events since the original cause of failure can be
// due to too many jobs being processed at a time.
Expand Down
8 changes: 5 additions & 3 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,14 @@ func (c *Cache) loadFile(path string, opts *file.Options) (f string, err error)
if err != nil {
return "", errors.Wrapf(err, "read-all: %s", path)
}

if _, err := toml.Decode(string(b), &data); err != nil {
d := Workflow{
Checksum: data.Checksum,
}
if _, err := toml.Decode(string(b), &d); err != nil {
return "", errors.Wrapf(err, "decode: %s", string(b))
}

c.Workflows[f] = data
c.Workflows[f] = d

return f, nil
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestLoadFile(t *testing.T) {
cases := trial.Cases{
"read file": {
Input: input{path: "../internal/test/workflow/f1.toml"},
Expected: "71c745e929d87c1448e516877acbc86f", // checksum of test file
Expected: "4422274d9c9f7e987c609687a7702651", // checksum of test file
},
"stat error": {
Input: input{path: "nop://stat_err"},
Expand Down

0 comments on commit b9a5a34

Please sign in to comment.