Skip to content

Commit

Permalink
add globreader and log-proc worker (#140)
Browse files Browse the repository at this point in the history
* add globreader util and log-proc worker

* unit test fix

* add folder support to glob function

* rename log-proc to transform

* update nop reader to return stat with filename

* transform unit tests

* flowlord shows all schedule crons with offset

* unit test fixes
  • Loading branch information
jbsmith7741 authored Jan 13, 2021
1 parent 8c5e7f5 commit 9d1b380
Show file tree
Hide file tree
Showing 13 changed files with 646 additions and 167 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apk add ca-certificates
RUN apk add curl

#confd
ADD https://github.com/kelseyhightower/confd/releases/download/v0.16.0/confd-0.16.0-linux-amd64 /usr/bin/confd
ADD https://github.com/jbsmith7741/confd/releases/download/v0.16.0-vault/confd-0.16.0-vault-darwin-amd64 /usr/bin/confd
RUN chmod +x /usr/bin/confd

#gojq
Expand All @@ -18,4 +18,4 @@ RUN rm -rf /tmp/*
RUN echo -e "#!/bin/sh \n ls -Alhp \$1" > /usr/bin/ll
RUN chmod +x /usr/bin/ll

COPY build/ /usr/bin/
COPY build/* /usr/bin/
16 changes: 14 additions & 2 deletions apps/taskmasters/flowlord/taskmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type stats struct {
type cEntry struct {
Next time.Time
Prev time.Time
Schedule string
Schedule []string
Child []string `json:"Child,omitempty"`
}

Expand Down Expand Up @@ -94,10 +94,22 @@ func (tm *taskMaster) Info() interface{} {
ent := cEntry{
Next: e.Next,
Prev: e.Prev,
Schedule: j.Schedule,
Schedule: []string{j.Schedule + "?offset=" + gtools.PrintDuration(j.Offset)},
Child: make([]string, 0),
}
k := j.Topic + ":" + j.Name

// check if for multi-scheduled entries
if e, found := sts.Entries[k]; found {
if e.Prev.After(ent.Prev) {
ent.Prev = e.Prev // keep the last run time
}
if e.Next.Before(ent.Next) {
ent.Next = e.Next // keep the next run time
}
ent.Schedule = append(ent.Schedule, e.Schedule...)
}
// add children
ent.Child = tm.getAllChildren(j.Topic, j.Workflow, j.Name)
sts.Entries[k] = ent
}
Expand Down
6 changes: 3 additions & 3 deletions apps/workers/sort2file/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,16 @@ func ExampleDoTaskReadLineErr() {
defer os.Unsetenv("TZ")

ctx, _ := context.WithCancel(context.Background())
info := `nop://readline_err/?date-field=0&dest-template=nop://{HH}.csv&sep=,`
info := `nop://readline_err?date-field=0&dest-template=nop://{HH}.csv&sep=,`
wkr := newWorker(info)
result, msg := wkr.DoTask(ctx)

fmt.Println(result) // output: error
fmt.Println(msg) // output: issue at line 1: readline_err (nop://readline_err/)
fmt.Println(msg) // output: issue at line 1: readline_err (nop://readline_err)

// Output:
// error
// issue at line 1: readline_err (nop://readline_err/)
// issue at line 1: readline_err (nop://readline_err)
}

func ExampleWorker_DoTaskDirSrc() {
Expand Down
172 changes: 172 additions & 0 deletions apps/workers/transform/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package main

import (
"context"
// "encoding/json"
"io/ioutil"
"log"
"sync"

"github.com/dustin/go-humanize"
"github.com/itchyny/gojq"
"github.com/jbsmith7741/uri"
jsoniter "github.com/json-iterator/go"
"github.com/pcelvng/task"

tools "github.com/pcelvng/task-tools"
"github.com/pcelvng/task-tools/bootstrap"
"github.com/pcelvng/task-tools/file"
)

const (
taskType = "transform"
description = `modify json data using jq syntax
info params:
- origin: (required) - glob path to a file(s) to transform (extract)
- dest: (required) - file path to where the resulting data will be written
- jq: (required) - file path to a jq definition file
- threads: - number of threads to process files (default: 2)
example
{"task":"transform","info":"gs://path/to/file/*/*.gz?dest=gs://path/dest/output.gz&jq=./conf.jq"}`
)

type options struct {
File file.Options
}

func main() {
opts := &options{}

app := bootstrap.NewWorkerApp(taskType, opts.newWorker, opts).
Version(tools.String()).
Description(description).
Initialize()

app.Run()
}

func (o *options) newWorker(info string) task.Worker {
w := &worker{
options: *o,
}

if err := uri.Unmarshal(info, w); err != nil {
return task.InvalidWorker("uri error: %s", err)
}

if w.Threads < 1 {
return task.InvalidWorker("invalid threads %d (min: 1)", w.Threads)
}

jqreader, err := file.NewReader(w.JqConfig, &o.File)
if err != nil {
return task.InvalidWorker("jq config: %s", err)
}
jqlogic, err := ioutil.ReadAll(jqreader)
if err != nil {
return task.InvalidWorker("jq config read: %s", err)
}

if w.reader, err = file.NewGlobReader(w.Path, &o.File); err != nil {
return task.InvalidWorker("reader error: %s", err)
}

if w.writer, err = file.NewWriter(w.Dest, &o.File); err != nil {
return task.InvalidWorker("writer error: %s", err)
}

query, err := gojq.Parse(string(jqlogic))
if err != nil {
return task.InvalidWorker("invalid jq: %s", err)
}
if w.code, err = gojq.Compile(query); err != nil {
return task.InvalidWorker("invalid jq-compile: %s", err)
}

return w
}

func (o options) Validate() error {
return nil
}

type worker struct {
Path string `uri:"origin" required:"true"`
Dest string `uri:"dest" required:"true"`
JqConfig string `uri:"jq" required:"true"`
Threads int `uri:"threads" default:"2"`

reader file.Reader
writer file.Writer
code *gojq.Code

options
}

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
in := make(chan []byte, 200)
errChan := make(chan error)
log.Printf("threads: %d", w.Threads)

var wg sync.WaitGroup
for i := 0; i < w.Threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for b := range in {
if err := w.process(b); err != nil {
errChan <- err
}
}
}()
}

scanner := file.NewScanner(w.reader)
for scanner.Scan() {
select {
case <-ctx.Done():
close(in)
return task.Interrupted()
case err := <-errChan:
return task.Failed(err)
default:
in <- scanner.Bytes()
}
}
close(in)
wg.Wait()

sts := w.writer.Stats()
if sts.ByteCnt == 0 {
w.writer.Abort()
return task.Completed("no data to write")
}
if err := w.writer.Close(); err != nil {
return task.Failed(err)
}
osts, _ := file.Stat(w.Dest, &w.File)

return task.Completed("%d files processed with %d lines and %s", w.reader.Stats().Files, sts.LineCnt, humanize.IBytes(uint64(osts.Size)))
}

func (w *worker) process(line []byte) error {
data := make(map[string]interface{})
if err := jsoniter.Unmarshal(line, &data); err != nil {
return err
}
result, ok := w.code.Run(data).Next()
if !ok {
return result.(error)
}

b, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(result)
if err != nil {
return err
}
if err := w.writer.WriteLine(b); err != nil {
return err
}
return nil
}
116 changes: 116 additions & 0 deletions apps/workers/transform/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"errors"
"testing"
"time"

"github.com/hydronica/trial"
"github.com/itchyny/gojq"
"github.com/pcelvng/task"

"github.com/pcelvng/task-tools/file/mock"
"github.com/pcelvng/task-tools/file/nop"
)

const examplejson = `{"a":1,"b":12.345678901,"c":"apple","d":"dog"}`

func BenchmarkProcess(t *testing.B) {
tm := &worker{}
tm.writer, _ = nop.NewWriter("nop://")
query, err := gojq.Parse("{a: .a,b: .b,c: .c, d: .d, e: (.e // 0) }")
if err != nil {
t.Fatal(err)
}
tm.code, _ = gojq.Compile(query)

t.ResetTimer()

for i := 0; i < t.N; i++ {
tm.process([]byte(examplejson))
}
}

func TestNewWorker(t *testing.T) {
fn := func(in trial.Input) (interface{}, error) {
o := &options{}
w := o.newWorker(in.String())
if b, err := task.IsInvalidWorker(w); b {
return nil, errors.New(err)
}
return nil, nil
}
cases := trial.Cases{
"valid": {
Input: "nop://file.txt?dest=nop://output.txt&jq=nop://read_eof",
},
"no origin": {
Input: "?jq=nop://file.jq&dest=nop://output.txt",
ExpectedErr: errors.New("origin is required"),
},
"no dest": {
Input: "nop://file.txt?jq=nop://read_eof",
ExpectedErr: errors.New("dest is required"),
},
"no jq": {
Input: "nop://file.txt?dest=nop://output.txt",
ExpectedErr: errors.New("jq is required"),
},
"invalid threads": {
Input: "nop://file.txt?dest=nop://output.txt&jq=nop://read_eof&threads=0",
ExpectedErr: errors.New("threads"),
},
}
trial.New(fn, cases).Timeout(3 * time.Second).Test(t)
}

func TestWorker_Process(t *testing.T) {
type input struct {
data string
jq string
}
fn := func(in trial.Input) (interface{}, error) {
v := in.Interface().(input)

// setup the worker
w := mock.NewWriter("nop://")
wrk := &worker{
writer: w,
code: nil,
}
q, err := gojq.Parse(v.jq)
if err != nil {
return nil, err
}
wrk.code, err = gojq.Compile(q)
if err != nil {
return nil, err
}
// test the method
err = wrk.process([]byte(v.data))

// retrieve the data
lines := w.GetLines()
if len(lines) == 0 {
return "", err
}
return lines[0], err
}
cases := trial.Cases{
"passthrough": {
Input: input{
data: examplejson,
jq: ".",
},
Expected: examplejson,
},
"defaults": {
Input: input{
data: examplejson,
jq: "{e: (.e // 0)}",
},
Expected: `{"e":0}`,
},
}
trial.New(fn, cases).Test(t)
}
26 changes: 26 additions & 0 deletions apps/workers/transform/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Transform

a generic worker that provides a means to process json logs. Transform uses gojq internal to modify json and will behave the same as if running jq from the command line.

Transform is different than running on the command line as it has support for reading and writing to remote services and is easily scheduled and managed using _task_.

## Config

- File: used to configure access for reading and writing

### Info string
`gs://path/to/file/*/*.gz?dest=gs://path/dest/output.gz&jq=./conf.jq`

- origin: `gs://path/to/file/*/*.gz` - file(s) to process
- dest: `dest=gs://path/dest/output.gz` - destination path for output
- jq: `jq=./conf.jq` - jq definition file
- Threads: number of threads to use process the logs, increase to utilize more CPUs

## Performance
Transform performs a bit slower than jq single threaded, but runs much better with multiple threads.

basic : 115k lines test
- jq: 53s
- gojq: 28s (v0.12.0)
- transform 1 thread: 40s
- transform 2 threads: 25s
Loading

0 comments on commit 9d1b380

Please sign in to comment.