Skip to content

Commit

Permalink
Docs (#218)
Browse files Browse the repository at this point in the history
* - don't send files to taskTopic if blank
- allow no lookup window

* update documentation
  • Loading branch information
jbsmith7741 authored Nov 30, 2024
1 parent f1320e7 commit 2a05d43
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 20 deletions.
28 changes: 26 additions & 2 deletions apps/flowlord/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ template = "?date={yyyy}-{mm}-{dd}T{hh}"
### files
schedule a task after a specified file is written. This should be used with the filewatcher taskmaster or GCP file watching service. File matching is done using the stdlib filepath.Match which does not support `**` matching. Flowlord will attempt to pull the timestamp from the filepath which will be used to populate the date-time in phase's template `{yyyy}|{dd}|{mm}|{hh}`. The matching file can be referenced in the phase's template with `{meta:file}.`. The filename can be references with `{meta:filename}`.

```
``` toml
[[Phase]]
task = "topic"
rule = "files=/folder/*/*/*.txt"
Expand All @@ -98,7 +98,7 @@ template = "{meta:file}?opt=true"

used to indicate a required field or value before starting a child process.

```
``` toml
[[Phase]]
task = "child:job"
rule = "require:{meta:file}"
Expand All @@ -115,3 +115,27 @@ batching is a way to create multiple tasks when the phase is run. This can be do
* meta: comma separated data associate with a key. Each item will generate a new task Ex: meta=key:a,b,c|key2=1,2,3
* meta_file: a line deliminated data file. each row (line) will generate a new task.

``` toml
# run every day at 2:01:00 for multiple items
# generates 3 tasks with the info of
# ?name=a&value=1 AND ?name=b&value=2 AND ?name=c&value=3
[[Phase]]
task="worker:job-meta"
rule="cron=0 1 2 * * *&meta=name:a,b,c|value:1,2,3"
template="?name={meta:name}&value={meta:value}&day={yyyy}-{mm}-{dd}"

# run every day at 5:05:00
# generates a task for every line in the file
[[Phase]]
task="worker:job-file"
rule="cron=0 5 5 * * *&meta_file=data.json"
template="?name={meta:name}&value={meta:value}&day={yyyy}-{mm}-{dd}"

# run every day for the last week
# generates 8 tasks from today to 7 days ago
[[Phase]]
task="worker:lastweek"
rule="cron=0 7 7 * * *&for=-168h&by=day
template=?day={yyyy}-{mm}-{dd}
```
12 changes: 12 additions & 0 deletions apps/flowlord/taskmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,18 @@ func TestTaskMaster_Batch(t *testing.T) {
{Type: "batch-date", Info: "?day=2024-01-13", Meta: "cron=2024-01-13T00&workflow=batch.toml"},
},
},
"for-48h +offset": {
Input: workflow.Phase{
Task: "batch-date",
Rule: "for=-48h&offset=-48h",
Template: "?day={yyyy}-{mm}-{dd}",
},
Expected: []task.Task{
{Type: "batch-date", Info: "?day=2024-01-13", Meta: "cron=2024-01-13T00&workflow=batch.toml"},
{Type: "batch-date", Info: "?day=2024-01-12", Meta: "cron=2024-01-12T00&workflow=batch.toml"},
{Type: "batch-date", Info: "?day=2024-01-11", Meta: "cron=2024-01-11T00&workflow=batch.toml"},
},
},
"metas": {
Input: workflow.Phase{
Task: "meta-batch",
Expand Down
24 changes: 14 additions & 10 deletions apps/utils/filewatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"syscall"

"github.com/jbsmith7741/go-tools/appenderr"
"github.com/pcelvng/task/bus"

tools "github.com/pcelvng/task-tools"
"github.com/pcelvng/task-tools/bootstrap"
"github.com/pcelvng/task/bus"
)

const (
Expand All @@ -28,11 +29,13 @@ example rule:
type options struct {
Bus *bus.Options `toml:"bus"`

AccessKey string `toml:"access_key" desc:"secret token for S3/GCS access "`
SecretKey string `toml:"secret_key" desc:"secret key for S3/GCS access "`
FilesTopic string `toml:"files_topic" desc:"topic override (default is files) disable with -"`
TaskTopic string `toml:"task_topic" desc:"topic to send new task"`
Rules []*Rule `toml:"rule"`
FilesTopic string `toml:"files_topic" desc:"topic override (default is files) disable with -"`
TaskTopic string `toml:"task_topic" desc:"topic to send new task"`

AccessKey string `toml:"access_key" desc:"secret token for S3/GCS access "`
SecretKey string `toml:"secret_key" desc:"secret key for S3/GCS access "`

Rules []*Rule `toml:"rule"`
}

type Rule struct {
Expand All @@ -45,7 +48,7 @@ type Rule struct {
func (o options) Validate() error {
errs := appenderr.New()
if o.AccessKey == "" || o.SecretKey == "" {
log.Println("AWS Credentials are blank")
log.Println("File credentials are blank")
}
if len(o.Rules) == 0 {
errs.Add(errors.New("at least one rule is required"))
Expand All @@ -62,14 +65,15 @@ func main() {
FilesTopic: "files",
Rules: []*Rule{
{
HourLookback: 24,
HourLookback: defaultLookback,
PathTemplate: "gs://folder/{HOUR_SLUG}/*.json",
Frequency: "1h",
TaskTemplate: "{FILE_PATH}?&param=other-param&dest=gs://folder/{HOUR_SLUG}/file.json",
Frequency: defaultFrequency,
TaskTemplate: "",
},
},
}

log.SetFlags(log.LstdFlags | log.Lshortfile)
bootstrap.NewUtility(appName, opt).
Description(description).
Version(tools.String()).Initialize()
Expand Down
14 changes: 6 additions & 8 deletions apps/utils/filewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"github.com/dustinevan/chron"
jsoniter "github.com/json-iterator/go"
"github.com/pcelvng/task"
"github.com/pcelvng/task/bus"

"github.com/pcelvng/task-tools/file"
"github.com/pcelvng/task-tools/file/stat"
"github.com/pcelvng/task-tools/tmpl"
"github.com/pcelvng/task/bus"
)

var (
Expand Down Expand Up @@ -43,10 +44,6 @@ func newWatchers(appOpt *options) (watchers []*watcher, err error) {
}

for _, r := range appOpt.Rules {
if r.HourLookback == 0 {
r.HourLookback = defaultLookback
}

if r.Frequency == "" {
r.Frequency = defaultFrequency
}
Expand Down Expand Up @@ -144,10 +141,10 @@ func (w watcher) currentFiles(paths ...string) fileList {
SecretKey: w.appOpt.SecretKey,
})
if err != nil {
log.Println(err)
log.Printf("issue listing %v: %v", p, err)
continue
}
// iterate over the list to setup the new complete fileList
// iterate over the list to set up the new complete fileList
for i := range list {
if list[i].IsDir {
continue
Expand All @@ -170,12 +167,13 @@ func (w *watcher) sendFiles(files fileList) {
w.producer.Send(w.appOpt.FilesTopic, b)
}

if w.appOpt.TaskTopic != "" {
if w.appOpt.TaskTopic != "" && w.rule.TaskTemplate != "" {
t := tmpl.PathTime(f.Path)
info := tmpl.Parse(w.rule.TaskTemplate, t)
info = strings.Replace(info, "{WATCH_FILE}", f.Path, -1)

tsk := task.New(w.appOpt.TaskTopic, info)
tsk.Job = "filewatcher"
meta := task.NewMeta()
meta.SetMeta("job", "filewatcher")
tsk.Meta = meta.GetMeta().Encode()
Expand Down
104 changes: 104 additions & 0 deletions internal/Docs/PG_Tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Postgres Tables
Suggested tables to be able to track and store processed task records.

### Required Functions

``` sql
-- parse_param will convert a url param without a starting ? to a json key,value pair
CREATE OR REPLACE FUNCTION parse_param(url text)
RETURNS jsonb AS $$
DECLARE
result jsonb := '{}';
param text;
key text;
value text;
BEGIN
FOREACH param IN ARRAY regexp_split_to_array(url, '&')
LOOP
key := split_part(param, '=', 1);
value := coalesce(split_part(param, '=', 2),'');
if key is not null and key != '' then
result := jsonb_set(result, ARRAY [key], to_jsonb(value), true);
end if;
END LOOP;

RETURN result;
END;
$$ LANGUAGE plpgsql;

-- parse_url takes a url value, extract the origin and the query params into a json object
CREATE OR REPLACE FUNCTION parse_url(url text)
RETURNS jsonb AS $$
DECLARE
result jsonb := '{}';
query text;
origin text;
BEGIN
-- Construct origin
origin := coalesce(substring(url from '^((?:([^:/?#]+)://([^/?#]*))?([^?#]*))'), '');
result := jsonb_set(result, '{origin}', to_jsonb(origin), true);

-- Extract query parameters
query := substring(url from '\?.*$');
IF query IS NOT NULL THEN
query := substring(query from 2); -- Remove the leading '?'
result := result || parse_param(query);
END IF;

RETURN result;
END;
$$ LANGUAGE plpgsql;
```

## Task-logs
used to store the done tasks.

``` sql
create table public.task_log
(
id text,
type text,
job text,
info text,
result text,
meta text,
msg text,
created timestamp,
started timestamp,
ended timestamp
);

create index task_log_created
on public.task_log (created);

create index task_log_started
on public.task_log (started);

create index task_log_type
on public.task_log (type);

create index task_log_job
on public.task_log (job);
```

## Task View
a user friendly view of task-logs that adds time to complete task in _**task_seconds**_ and _**task_time**_ and parsed url values of _**info**_ and _**meta**_

``` sql
create or replace view public.tasks (id, type, job, info, infoJSON, meta, metaJSON, msg, result, task_seconds, task_time, created, started, ended) as
select task_log.id,
task_log.type,
task_log.job,
task_log.info,
parse_url(task_log.info),
task_log.meta,
parse_param(task_log.meta),
task_log.msg,
task_log.result,
date_part('epoch'::text, task_log.ended - task_log.started) as task_seconds,
to_char(task_log.ended - task_log.started, 'HH24:MI:SS'::text) as task_time,
task_log.created,
task_log.started,
task_log.ended
from task_log;
```

0 comments on commit 2a05d43

Please sign in to comment.