diff --git a/apps/taskmasters/flowlord/README.md b/apps/taskmasters/flowlord/README.md index 0e1c342f..5edc981d 100644 --- a/apps/taskmasters/flowlord/README.md +++ b/apps/taskmasters/flowlord/README.md @@ -17,23 +17,24 @@ template = "?date={yyyy}-{mm}-{dd}" [[Phase]] task = "task1" dependsOn = "topic_name" -rule = "" +rule = "retry_delay=30s" retry = 3 template = "?hour={yyyy}-{mm}-{dd}T{hh}" ``` ### Phase - - task: the name of the topic this task will be sent to. It is also the unique name of the task. In Addition a job of a task can be added in the task name using a colon (:) as a separator (task:job) - - dependsOn: + - **task**: the name of the topic this task will be sent to. It is also the unique name of the task. In Addition a job of a task can be added in the task name using a colon (:) as a separator (task:job) + - **dependsOn**: - the name of the parent task - this task will start after the parent task has completed successfully - if left blank this tasks will only start based on the rule - - rule: rules on about the tasks that are encoded as query params - - cron: schedule the task based on the cron pattern (see scheduling) - - require: used in a child task saying to only start task if value is present - - retry: the number of times a task is retried before being sent to failed_tasks - - template: a URL string that is parsed and put into the task's info string when created + - **rule**: rules on about the tasks that are encoded as query params + - _cron_: schedule the task based on the cron pattern (see scheduling) + - _require_: used in a child task saying to only start task if value is present + - _retry_delay_: duration to wait before retrying the task + - **retry**: the number of times a task is retried before being sent to failed_tasks + - **template**: a URL string that is parsed and put into the task's info string when created ### Template templating is used to create dynamic tasks based on the time run or previous jobs run. templates are designated with surrounding brackets `{}` diff --git a/apps/taskmasters/flowlord/taskmaster.go b/apps/taskmasters/flowlord/taskmaster.go index 14695bfc..bf69f3ba 100644 --- a/apps/taskmasters/flowlord/taskmaster.go +++ b/apps/taskmasters/flowlord/taskmaster.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "net/url" "regexp" "strconv" @@ -24,6 +25,10 @@ import ( "github.com/pcelvng/task-tools/workflow" ) +func init() { + rand.Seed(time.Now().Unix()) +} + type taskMaster struct { initTime time.Time nextUpdate time.Time @@ -224,9 +229,11 @@ func (tm *taskMaster) schedule() (err error) { // Send retry failed tasks to tm.failedTopic (only if the phase exists in the workflow) func (tm *taskMaster) Process(t *task.Task) error { meta, _ := url.ParseQuery(t.Meta) - // attempt to return + + // attempt to retry if t.Result == task.ErrResult { p := tm.Get(*t) + rules, _ := url.ParseQuery(p.Rule) r := meta.Get("retry") i, _ := strconv.Atoi(r) @@ -235,13 +242,23 @@ func (tm *taskMaster) Process(t *task.Task) error { return nil } if p.Retry > i { + var delay time.Duration + if s := rules.Get("retry_delay"); s != "" { + delay, _ = time.ParseDuration(s) + delay = delay + jitterPercent(delay, 40) + meta.Set("delayed", gtools.PrintDuration(delay)) + } t = task.NewWithID(t.Type, t.Info, t.ID) i++ meta.Set("retry", strconv.Itoa(i)) t.Meta = meta.Encode() - if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil { - return err - } + go func() { + time.Sleep(delay) + if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil { + log.Println(err) + } + }() + return nil } else if tm.failedTopic != "-" { // send to the retry failed topic if retries > p.Retry meta.Set("retry", "failed") @@ -321,3 +338,25 @@ func (tm *taskMaster) read(ctx context.Context) { } } } + +// jitterPercent will return a time.Duration representing extra +// 'jitter' to be added to the wait time. Jitter is important +// in retry events since the original cause of failure can be +// due to too many jobs being processed at a time. +// +// By adding some jitter the retry events won't all happen +// at once but will get staggered to prevent the problem +// from happening again. +// +// 'p' is a percentage of the wait time. Duration returned +// is a random duration between 0 and p. 'p' should be a value +// between 0-100. +func jitterPercent(wait time.Duration, p int64) time.Duration { + // p == 40 + if wait == 0 { + return 0 + } + maxJitter := (int64(wait) * p) / 100 + + return time.Duration(rand.Int63n(maxJitter)) +} diff --git a/apps/taskmasters/flowlord/taskmaster_test.go b/apps/taskmasters/flowlord/taskmaster_test.go index 2b77424f..e7671b08 100644 --- a/apps/taskmasters/flowlord/taskmaster_test.go +++ b/apps/taskmasters/flowlord/taskmaster_test.go @@ -2,8 +2,11 @@ package main import ( "encoding/json" + "regexp" "sort" + "strings" "testing" + "time" "github.com/hydronica/trial" "github.com/pcelvng/task" @@ -13,6 +16,7 @@ import ( ) func TestTaskMaster_Process(t *testing.T) { + delayRegex := regexp.MustCompile(`delayed=(\d+.\d+)`) cache, err := workflow.New("../../../internal/test/workflow/f1.toml", nil) if err != nil { t.Fatal("cache init", err) @@ -31,6 +35,7 @@ func TestTaskMaster_Process(t *testing.T) { tm.producer = producer nop.FakeMsg = tsk.JSONBytes() err = tm.Process(&tsk) + time.Sleep(100 * time.Millisecond) result := make([]task.Task, 0) for _, msgs := range producer.Messages { for _, msg := range msgs { @@ -39,6 +44,9 @@ func TestTaskMaster_Process(t *testing.T) { return nil, err } v.Created = "" + if s := delayRegex.FindStringSubmatch(v.Meta); len(s) > 1 { + v.Meta = strings.Replace(v.Meta, s[1], "XX", 1) + } result = append(result, v) } } @@ -62,7 +70,7 @@ func TestTaskMaster_Process(t *testing.T) { Type: "task1", Info: "?date=2019-12-12", ID: "UUID_task1_attempt0", - Meta: "retry=1&workflow=f1.toml"}, + Meta: "delayed=XXms&retry=1&workflow=f1.toml"}, }, }, "task1 attempt 2": { @@ -77,7 +85,7 @@ func TestTaskMaster_Process(t *testing.T) { Type: "task1", Info: "?date=2019-12-12", ID: "UUID_task1_attempt2", - Meta: "retry=3&workflow=f1.toml"}, + Meta: "delayed=XXms&retry=3&workflow=f1.toml"}, }, }, "task1 no retry": { diff --git a/apps/workers/sql-load/worker.go b/apps/workers/sql-load/worker.go index a3d308f3..9fa8b3c6 100644 --- a/apps/workers/sql-load/worker.go +++ b/apps/workers/sql-load/worker.go @@ -132,12 +132,12 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) { queryChan := make(chan string, 10) go CreateInserts(rowChan, queryChan, tempTable, w.ds.insertCols, w.Params.BatchSize) - first := true + tableCreated := false // load data into temp table for s := range queryChan { - if first { + if !tableCreated { s = createTempTable + s - first = false + tableCreated = true } if _, err := w.sqlDB.ExecContext(ctx, s); err != nil { cancelFn() @@ -149,6 +149,10 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) { return task.Failed(w.ds.err) } + if !tableCreated { + return task.Completed("no data to load for %s", w.Params.Table) + } + //finalize and transfer data var txErr error var tx *sql.Tx diff --git a/internal/test/workflow/f1.toml b/internal/test/workflow/f1.toml index 096a8d6d..372ad13b 100644 --- a/internal/test/workflow/f1.toml +++ b/internal/test/workflow/f1.toml @@ -1,6 +1,6 @@ [[Phase]] task = "task1" -rule = "cron=0 * * * *&offset=-4h&job=t2" +rule = "cron=0 * * * *&offset=-4h&job=t2&retry_delay=10ms" retry = 3 template = "?date={yyyy}-{mm}-{dd}T{hh}" @@ -33,7 +33,7 @@ template = "?date={yyyy}-{mm}-{dd}T{hh}" [[Phase]] task = "task5" -rule = "job=t5" +rule = "job=t5&retry_delay=10ms" retry = 3 template = "?year={yyyy}" DependsOn = "task1:t2" diff --git a/retry/retry.go b/retry/retry.go index 03d4862d..aea8e272 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -229,7 +229,7 @@ func (r *Retryer) close() error { return nil } -// genJitter will return a time.Duration representing extra +// jitterPercent will return a time.Duration representing extra // 'jitter' to be added to the wait time. Jitter is important // in retry events since the original cause of failure can be // due to too many jobs being processed at a time. diff --git a/workflow/workflow.go b/workflow/workflow.go index 8daa3179..55ef9951 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -285,12 +285,14 @@ func (c *Cache) loadFile(path string, opts *file.Options) (f string, err error) if err != nil { return "", errors.Wrapf(err, "read-all: %s", path) } - - if _, err := toml.Decode(string(b), &data); err != nil { + d := Workflow{ + Checksum: data.Checksum, + } + if _, err := toml.Decode(string(b), &d); err != nil { return "", errors.Wrapf(err, "decode: %s", string(b)) } - c.Workflows[f] = data + c.Workflows[f] = d return f, nil } diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 1a092ab0..b3132a2c 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -26,7 +26,7 @@ func TestLoadFile(t *testing.T) { cases := trial.Cases{ "read file": { Input: input{path: "../internal/test/workflow/f1.toml"}, - Expected: "71c745e929d87c1448e516877acbc86f", // checksum of test file + Expected: "4422274d9c9f7e987c609687a7702651", // checksum of test file }, "stat error": { Input: input{path: "nop://stat_err"},