diff --git a/Dockerfile b/Dockerfile index a56cde44..07b30302 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ RUN apk add ca-certificates RUN apk add curl #confd -ADD https://github.com/kelseyhightower/confd/releases/download/v0.16.0/confd-0.16.0-linux-amd64 /usr/bin/confd +ADD https://github.com/jbsmith7741/confd/releases/download/v0.16.0-vault/confd-0.16.0-vault-darwin-amd64 /usr/bin/confd RUN chmod +x /usr/bin/confd #gojq @@ -18,4 +18,4 @@ RUN rm -rf /tmp/* RUN echo -e "#!/bin/sh \n ls -Alhp \$1" > /usr/bin/ll RUN chmod +x /usr/bin/ll -COPY build/ /usr/bin/ \ No newline at end of file +COPY build/* /usr/bin/ \ No newline at end of file diff --git a/apps/taskmasters/flowlord/taskmaster.go b/apps/taskmasters/flowlord/taskmaster.go index 88444009..14695bfc 100644 --- a/apps/taskmasters/flowlord/taskmaster.go +++ b/apps/taskmasters/flowlord/taskmaster.go @@ -49,7 +49,7 @@ type stats struct { type cEntry struct { Next time.Time Prev time.Time - Schedule string + Schedule []string Child []string `json:"Child,omitempty"` } @@ -94,10 +94,22 @@ func (tm *taskMaster) Info() interface{} { ent := cEntry{ Next: e.Next, Prev: e.Prev, - Schedule: j.Schedule, + Schedule: []string{j.Schedule + "?offset=" + gtools.PrintDuration(j.Offset)}, Child: make([]string, 0), } k := j.Topic + ":" + j.Name + + // check if for multi-scheduled entries + if e, found := sts.Entries[k]; found { + if e.Prev.After(ent.Prev) { + ent.Prev = e.Prev // keep the last run time + } + if e.Next.Before(ent.Next) { + ent.Next = e.Next // keep the next run time + } + ent.Schedule = append(ent.Schedule, e.Schedule...) + } + // add children ent.Child = tm.getAllChildren(j.Topic, j.Workflow, j.Name) sts.Entries[k] = ent } diff --git a/apps/workers/sort2file/worker_test.go b/apps/workers/sort2file/worker_test.go index e90327db..b0832874 100644 --- a/apps/workers/sort2file/worker_test.go +++ b/apps/workers/sort2file/worker_test.go @@ -372,16 +372,16 @@ func ExampleDoTaskReadLineErr() { defer os.Unsetenv("TZ") ctx, _ := context.WithCancel(context.Background()) - info := `nop://readline_err/?date-field=0&dest-template=nop://{HH}.csv&sep=,` + info := `nop://readline_err?date-field=0&dest-template=nop://{HH}.csv&sep=,` wkr := newWorker(info) result, msg := wkr.DoTask(ctx) fmt.Println(result) // output: error - fmt.Println(msg) // output: issue at line 1: readline_err (nop://readline_err/) + fmt.Println(msg) // output: issue at line 1: readline_err (nop://readline_err) // Output: // error - // issue at line 1: readline_err (nop://readline_err/) + // issue at line 1: readline_err (nop://readline_err) } func ExampleWorker_DoTaskDirSrc() { diff --git a/apps/workers/transform/main.go b/apps/workers/transform/main.go new file mode 100644 index 00000000..46e60608 --- /dev/null +++ b/apps/workers/transform/main.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + // "encoding/json" + "io/ioutil" + "log" + "sync" + + "github.com/dustin/go-humanize" + "github.com/itchyny/gojq" + "github.com/jbsmith7741/uri" + jsoniter "github.com/json-iterator/go" + "github.com/pcelvng/task" + + tools "github.com/pcelvng/task-tools" + "github.com/pcelvng/task-tools/bootstrap" + "github.com/pcelvng/task-tools/file" +) + +const ( + taskType = "transform" + description = `modify json data using jq syntax + + info params: + - origin: (required) - glob path to a file(s) to transform (extract) + - dest: (required) - file path to where the resulting data will be written + - jq: (required) - file path to a jq definition file + - threads: - number of threads to process files (default: 2) + +example +{"task":"transform","info":"gs://path/to/file/*/*.gz?dest=gs://path/dest/output.gz&jq=./conf.jq"}` +) + +type options struct { + File file.Options +} + +func main() { + opts := &options{} + + app := bootstrap.NewWorkerApp(taskType, opts.newWorker, opts). + Version(tools.String()). + Description(description). + Initialize() + + app.Run() +} + +func (o *options) newWorker(info string) task.Worker { + w := &worker{ + options: *o, + } + + if err := uri.Unmarshal(info, w); err != nil { + return task.InvalidWorker("uri error: %s", err) + } + + if w.Threads < 1 { + return task.InvalidWorker("invalid threads %d (min: 1)", w.Threads) + } + + jqreader, err := file.NewReader(w.JqConfig, &o.File) + if err != nil { + return task.InvalidWorker("jq config: %s", err) + } + jqlogic, err := ioutil.ReadAll(jqreader) + if err != nil { + return task.InvalidWorker("jq config read: %s", err) + } + + if w.reader, err = file.NewGlobReader(w.Path, &o.File); err != nil { + return task.InvalidWorker("reader error: %s", err) + } + + if w.writer, err = file.NewWriter(w.Dest, &o.File); err != nil { + return task.InvalidWorker("writer error: %s", err) + } + + query, err := gojq.Parse(string(jqlogic)) + if err != nil { + return task.InvalidWorker("invalid jq: %s", err) + } + if w.code, err = gojq.Compile(query); err != nil { + return task.InvalidWorker("invalid jq-compile: %s", err) + } + + return w +} + +func (o options) Validate() error { + return nil +} + +type worker struct { + Path string `uri:"origin" required:"true"` + Dest string `uri:"dest" required:"true"` + JqConfig string `uri:"jq" required:"true"` + Threads int `uri:"threads" default:"2"` + + reader file.Reader + writer file.Writer + code *gojq.Code + + options +} + +func (w *worker) DoTask(ctx context.Context) (task.Result, string) { + in := make(chan []byte, 200) + errChan := make(chan error) + log.Printf("threads: %d", w.Threads) + + var wg sync.WaitGroup + for i := 0; i < w.Threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for b := range in { + if err := w.process(b); err != nil { + errChan <- err + } + } + }() + } + + scanner := file.NewScanner(w.reader) + for scanner.Scan() { + select { + case <-ctx.Done(): + close(in) + return task.Interrupted() + case err := <-errChan: + return task.Failed(err) + default: + in <- scanner.Bytes() + } + } + close(in) + wg.Wait() + + sts := w.writer.Stats() + if sts.ByteCnt == 0 { + w.writer.Abort() + return task.Completed("no data to write") + } + if err := w.writer.Close(); err != nil { + return task.Failed(err) + } + osts, _ := file.Stat(w.Dest, &w.File) + + return task.Completed("%d files processed with %d lines and %s", w.reader.Stats().Files, sts.LineCnt, humanize.IBytes(uint64(osts.Size))) +} + +func (w *worker) process(line []byte) error { + data := make(map[string]interface{}) + if err := jsoniter.Unmarshal(line, &data); err != nil { + return err + } + result, ok := w.code.Run(data).Next() + if !ok { + return result.(error) + } + + b, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(result) + if err != nil { + return err + } + if err := w.writer.WriteLine(b); err != nil { + return err + } + return nil +} diff --git a/apps/workers/transform/main_test.go b/apps/workers/transform/main_test.go new file mode 100644 index 00000000..5ca15334 --- /dev/null +++ b/apps/workers/transform/main_test.go @@ -0,0 +1,116 @@ +package main + +import ( + "errors" + "testing" + "time" + + "github.com/hydronica/trial" + "github.com/itchyny/gojq" + "github.com/pcelvng/task" + + "github.com/pcelvng/task-tools/file/mock" + "github.com/pcelvng/task-tools/file/nop" +) + +const examplejson = `{"a":1,"b":12.345678901,"c":"apple","d":"dog"}` + +func BenchmarkProcess(t *testing.B) { + tm := &worker{} + tm.writer, _ = nop.NewWriter("nop://") + query, err := gojq.Parse("{a: .a,b: .b,c: .c, d: .d, e: (.e // 0) }") + if err != nil { + t.Fatal(err) + } + tm.code, _ = gojq.Compile(query) + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + tm.process([]byte(examplejson)) + } +} + +func TestNewWorker(t *testing.T) { + fn := func(in trial.Input) (interface{}, error) { + o := &options{} + w := o.newWorker(in.String()) + if b, err := task.IsInvalidWorker(w); b { + return nil, errors.New(err) + } + return nil, nil + } + cases := trial.Cases{ + "valid": { + Input: "nop://file.txt?dest=nop://output.txt&jq=nop://read_eof", + }, + "no origin": { + Input: "?jq=nop://file.jq&dest=nop://output.txt", + ExpectedErr: errors.New("origin is required"), + }, + "no dest": { + Input: "nop://file.txt?jq=nop://read_eof", + ExpectedErr: errors.New("dest is required"), + }, + "no jq": { + Input: "nop://file.txt?dest=nop://output.txt", + ExpectedErr: errors.New("jq is required"), + }, + "invalid threads": { + Input: "nop://file.txt?dest=nop://output.txt&jq=nop://read_eof&threads=0", + ExpectedErr: errors.New("threads"), + }, + } + trial.New(fn, cases).Timeout(3 * time.Second).Test(t) +} + +func TestWorker_Process(t *testing.T) { + type input struct { + data string + jq string + } + fn := func(in trial.Input) (interface{}, error) { + v := in.Interface().(input) + + // setup the worker + w := mock.NewWriter("nop://") + wrk := &worker{ + writer: w, + code: nil, + } + q, err := gojq.Parse(v.jq) + if err != nil { + return nil, err + } + wrk.code, err = gojq.Compile(q) + if err != nil { + return nil, err + } + // test the method + err = wrk.process([]byte(v.data)) + + // retrieve the data + lines := w.GetLines() + if len(lines) == 0 { + return "", err + } + return lines[0], err + } + cases := trial.Cases{ + "passthrough": { + Input: input{ + data: examplejson, + jq: ".", + }, + Expected: examplejson, + }, + "defaults": { + Input: input{ + data: examplejson, + jq: "{e: (.e // 0)}", + }, + Expected: `{"e":0}`, + }, + } + trial.New(fn, cases).Test(t) +} diff --git a/apps/workers/transform/readme.md b/apps/workers/transform/readme.md new file mode 100644 index 00000000..4016310f --- /dev/null +++ b/apps/workers/transform/readme.md @@ -0,0 +1,26 @@ +# Transform + +a generic worker that provides a means to process json logs. Transform uses gojq internal to modify json and will behave the same as if running jq from the command line. + +Transform is different than running on the command line as it has support for reading and writing to remote services and is easily scheduled and managed using _task_. + +## Config + + - File: used to configure access for reading and writing + +### Info string +`gs://path/to/file/*/*.gz?dest=gs://path/dest/output.gz&jq=./conf.jq` + + - origin: `gs://path/to/file/*/*.gz` - file(s) to process + - dest: `dest=gs://path/dest/output.gz` - destination path for output + - jq: `jq=./conf.jq` - jq definition file +- Threads: number of threads to use process the logs, increase to utilize more CPUs + +## Performance +Transform performs a bit slower than jq single threaded, but runs much better with multiple threads. + +basic : 115k lines test +- jq: 53s +- gojq: 28s (v0.12.0) +- transform 1 thread: 40s +- transform 2 threads: 25s \ No newline at end of file diff --git a/file/file.go b/file/file.go index f11d8431..0df486f0 100644 --- a/file/file.go +++ b/file/file.go @@ -7,6 +7,7 @@ import ( "net/url" "path" "path/filepath" + "strings" "github.com/pcelvng/task-tools/file/gs" "github.com/pcelvng/task-tools/file/local" @@ -257,35 +258,50 @@ func Stat(path string, opt *Options) (stat.Stats, error) { return local.Stat(path) } -// Glob will only match to files and will -// not match recursively. Only files directly in pthDir -// are candidates for matching. +// Glob will match to files and folder // // Supports the same globing patterns as provided in *nix // terminals. // -// Globing in directories is not supported. -// ie - s3://bucket/path/*/files.txt will not work +// Globing in directories is supported. +// ie - s3://bucket/path/*/files.txt will work +// s3://bucket/path/dir[0-5]*/*.txt will work // but s3://bucket/path/to/*.txt will work. func Glob(pth string, opt *Options) ([]stat.Stats, error) { if opt == nil { opt = NewOptions() } pthDir, pattern := path.Split(pth) + folders := []string{pthDir} + // check pthDir for pattern matches + if strings.ContainsAny(pthDir, "[]*?") { + f, err := matchFolder(pthDir, opt) + if err != nil { + return nil, err + } + folders = make([]string, len(f)) + for i, v := range f { + folders[i] = v.Path + } + } + allSts := make([]stat.Stats, 0) // get all files in dir - allSts, err := List(pthDir, opt) - if err != nil { - return nil, err + for _, f := range folders { + sts, err := List(f, opt) + if err != nil { + return nil, err + } + allSts = append(allSts, sts...) } // filter out files that don't match the glob pattern glbSts := make([]stat.Stats, 0) for _, sts := range allSts { + _, fName := path.Split(sts.Path) if sts.IsDir { continue } - _, fName := path.Split(sts.Path) isMatch, err := filepath.Match(pattern, fName) if err != nil { return nil, err @@ -295,10 +311,46 @@ func Glob(pth string, opt *Options) ([]stat.Stats, error) { glbSts = append(glbSts, sts) } } - return glbSts, nil } +func matchFolder(pth string, opt *Options) (folders []stat.Stats, err error) { + pthDir, pattern := path.Split(strings.TrimRight(pth, "/")) + paths := []string{pthDir} + + // check pthDir for pattern matches + if strings.ContainsAny(pthDir, "[]*?") { + pthDir, pattern = path.Split(strings.TrimRight(pthDir, "/")) + sts, err := matchFolder(pthDir, opt) + if err != nil { + return nil, err + } + paths = make([]string, 0) + for _, f := range sts { + paths = append(paths, f.Path) + } + } + for _, p := range paths { + sts, err := List(p, opt) + if err != nil { + return nil, err + } + for _, f := range sts { + if !f.IsDir { + continue + } + _, fName := path.Split(strings.TrimRight(f.Path, "/")) + if isMatch, err := filepath.Match(pattern, fName); err != nil { + return nil, err + } else if isMatch { + folders = append(folders, f) + } + } + } + + return folders, nil +} + // parseScheme will return the pth scheme (if exists). // If there is no scheme then an empty string is returned. func parseScheme(pth string) string { diff --git a/file/file_test.go b/file/file_test.go index b5fbb579..149a9a29 100644 --- a/file/file_test.go +++ b/file/file_test.go @@ -9,10 +9,9 @@ import ( "net/url" "strings" - "path" - "fmt" + "github.com/hydronica/trial" minio "github.com/minio/minio-go" "github.com/pcelvng/task-tools/file/s3" ) @@ -22,139 +21,28 @@ var ( // // see: // https://docs.minio.io/docs/golang-client-api-reference - testEndpoint = "play.minio.io:9000" - testAccessKey = "Q3AM3UQ867SPQQA43P2F" - testSecretKey = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" - testBucket = "task-tools-test" - testS3Client *minio.Client -) - -func TestGlob_Local(t *testing.T) { - // create files - pths := []string{ - "./test/file-1.txt", - "./test/file2.txt", - "./test/file3.gz", - } - for _, pth := range pths { - createFile(pth) - } - - // test '*' - pth := "./test/*.txt" - allSts, err := Glob(pth, nil) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } - - if len(allSts) != 2 { - t.Errorf("expected %v got %v\n", 2, len(allSts)) - } - - // test '?' - pth = "./test/file?.txt" - allSts, err = Glob(pth, nil) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } - - if len(allSts) == 1 { - f1 := allSts[0] - expected := "file2.txt" - _, got := path.Split(f1.Path) - if got != expected { - t.Errorf("expected file '%v' got '%v'\n", expected, got) - } - } else { - t.Errorf("expected %v got %v\n", 1, len(allSts)) - } - - // test '?' with '*' - pth = "./test/file?.*" - allSts, err = Glob(pth, nil) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } - - if len(allSts) != 2 { - t.Errorf("expected %v got %v\n", 2, len(allSts)) - } - - // cleanup - for _, pth := range pths { - rmFile(pth) - } - rmFile("./test") -} - -func TestGlob_S3(t *testing.T) { - // create files - opt := NewOptions() - opt.AccessKey = testAccessKey - opt.SecretKey = testSecretKey - pthDir := fmt.Sprintf("s3://%v/test", testBucket) - pths := []string{ - fmt.Sprintf("%v/file-1.txt", pthDir), - fmt.Sprintf("%v/file2.txt", pthDir), - fmt.Sprintf("%v/file3.gz", pthDir), - } - for _, pth := range pths { - createFile(pth) - } - - // test '*' - pth := fmt.Sprintf("%v/*.txt", pthDir) - allSts, err := Glob(pth, opt) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } - - if len(allSts) != 2 { - t.Errorf("expected %v got %v\n", 2, len(allSts)) - } - - // test '?' - pth = fmt.Sprintf("%v/file?.txt", pthDir) - allSts, err = Glob(pth, opt) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } + testEndpoint = "play.minio.io:9000" + testBucket = "task-tools-test" + testS3Client *minio.Client - if len(allSts) == 1 { - f1 := allSts[0] - expected := "file2.txt" - _, got := path.Split(f1.Path) - if got != expected { - t.Errorf("expected file '%v' got '%v'\n", expected, got) - } - } else { - t.Errorf("expected %v got %v\n", 1, len(allSts)) + wd string + opts = Options{ + AccessKey: "Q3AM3UQ867SPQQA43P2F", + SecretKey: "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", } +) - // test '?' with '*' - pth = fmt.Sprintf("%v/file?.*", pthDir) - allSts, err = Glob(pth, opt) - if err != nil { - t.Errorf("expected nil but got '%v'\n", err.Error()) - } +func TestMain(m *testing.M) { - if len(allSts) != 2 { - t.Errorf("expected %v got %v\n", 2, len(allSts)) - } + // setup local files test + wd, _ = os.Getwd() - // cleanup - for _, pth := range pths { - rmS3File(pth) - } -} - -func TestMain(m *testing.M) { - // setup + // setup remote (minio/s3/gcs) test s3.StoreHost = testEndpoint // set test endpoint // s3 client var err error - testS3Client, err = minio.New(s3.StoreHost, testAccessKey, testSecretKey, true) + testS3Client, err = minio.New(s3.StoreHost, opts.AccessKey, opts.SecretKey, true) if err != nil { log.Println(err.Error()) os.Exit(1) @@ -167,47 +55,127 @@ func TestMain(m *testing.M) { os.Exit(1) } + // create files + pths := []string{ + "test/file-1.txt", + "test/file2.txt", + "test/file3.gz", + "test/f1/file4.gz", + "test/f3/file5.txt", + "test/f5/file-6.txt", + } + for _, pth := range pths { + createFile("./"+pth, &opts) // local + createFile(fmt.Sprintf("s3://%s/%s", testBucket, pth), &opts) // remote + } + code := m.Run() + // cleanup + os.RemoveAll("./test/") rmS3Bucket(testBucket) os.Exit(code) } -func createFile(pth string) { - opt := NewOptions() - opt.SecretKey = testSecretKey - opt.AccessKey = testAccessKey +func TestGlob_Local(t *testing.T) { + fn := func(input trial.Input) (interface{}, error) { + sts, err := Glob(input.String(), nil) + files := make([]string, len(sts)) + for i := 0; i < len(sts); i++ { + files[i] = strings.Replace(sts[i].Path, wd, ".", -1) + } + return files, err + } + cases := trial.Cases{ + "star.txt": { + Input: "./test/*.txt", + Expected: []string{"./test/file-1.txt", "./test/file2.txt"}, + }, + "file?.txt": { + Input: "./test/file?.txt", + Expected: []string{"./test/file2.txt"}, + }, + "file?.star": { + Input: "./test/file?.*", + Expected: []string{"./test/file2.txt", "./test/file3.gz"}, + }, + "folders": { + Input: "./test/*/*", + Expected: []string{"./test/f1/file4.gz", "./test/f3/file5.txt", "./test/f5/file-6.txt"}, + }, + "range": { + Input: "test/f[1-3]/*", + Expected: []string{"./test/f1/file4.gz", "./test/f3/file5.txt"}, + }, + "folder/star.txt": { + Input: "test/*/*.txt", + Expected: []string{"./test/f3/file5.txt", "./test/f5/file-6.txt"}, + }, + "file": { + Input: "test/file2.txt", + Expected: []string{"./test/file2.txt"}, + }, + "nop/file": { + Input: "nop://file.txt", //NOTE nop is hard-coded to return file.txt + Expected: []string{"nop://file.txt"}, + }, + } + trial.New(fn, cases).SubTest(t) + +} + +func TestGlob_S3(t *testing.T) { + path := "s3://" + testBucket + fn := func(input trial.Input) (interface{}, error) { + sts, err := Glob(input.String(), &opts) + files := make([]string, len(sts)) + for i := 0; i < len(sts); i++ { + files[i] = sts[i].Path + } + return files, err + } + cases := trial.Cases{ + "star.txt": { + Input: path + "/test/*.txt", + Expected: []string{path + "/test/file-1.txt", path + "/test/file2.txt"}, + }, + "file?.txt": { + Input: path + "/test/file?.txt", + Expected: []string{path + "/test/file2.txt"}, + }, + "file?.star": { + Input: path + "/test/file?.*", + Expected: []string{path + "/test/file2.txt", path + "/test/file3.gz"}, + }, + "folders": { + Input: path + "/test/*/*", + Expected: []string{path + "/test/f1/file4.gz", path + "/test/f3/file5.txt", path + "/test/f5/file-6.txt"}, + }, + "range": { + Input: path + "/test/f[1-3]/*", + Expected: []string{path + "/test/f1/file4.gz", path + "/test/f3/file5.txt"}, + }, + "folder/star.txt": { + Input: path + "/test/*/*.txt", + Expected: []string{path + "/test/f3/file5.txt", path + "/test/f5/file-6.txt"}, + }, + } + trial.New(fn, cases).SubTest(t) +} +func createFile(pth string, opt *Options) { w, _ := NewWriter(pth, opt) w.WriteLine([]byte("test line")) w.WriteLine([]byte("test line")) w.Close() } -func rmFile(pth string) { - fType := parseScheme(pth) - - if fType == "s3" { - rmS3File(pth) - } - - if fType == "nop" { - return - } - - os.Remove(pth) -} - func createBucket(bckt string) error { exists, err := testS3Client.BucketExists(bckt) - if err != nil { + if err != nil || exists { return err } - if exists { - return nil - } - return testS3Client.MakeBucket(bckt, "us-east-1") } diff --git a/file/globreader.go b/file/globreader.go new file mode 100644 index 00000000..73d32f97 --- /dev/null +++ b/file/globreader.go @@ -0,0 +1,106 @@ +package file + +import ( + "fmt" + "io" + "sync" + + "github.com/pcelvng/task-tools/file/stat" +) + +func NewGlobReader(path string, opts *Options) (_ Reader, err error) { + r := &GlobReader{ + path: path, + opts: *opts, + sts: stat.Stats{ + Path: path, + }, + } + if r.files, err = Glob(path, opts); err != nil { + return nil, err + } + if err := r.nextFile(); err != nil { + return nil, fmt.Errorf("no files (%d) found for %s", len(r.files), path) + } + + return r, nil +} + +type GlobReader struct { + mu sync.RWMutex + + path string + opts Options + sts stat.Stats + + files []stat.Stats + fileIndex int + reader Reader +} + +func (g *GlobReader) nextFile() (err error) { + g.mu.Lock() + defer g.mu.Unlock() + if len(g.files) <= g.fileIndex { + g.reader = nil + return io.EOF + } + if g.reader != nil { + sts := g.reader.Stats() + g.sts.ByteCnt += sts.ByteCnt + g.sts.LineCnt += sts.LineCnt + g.sts.Size += sts.Size + g.reader.Close() + } + g.reader, err = NewReader(g.files[g.fileIndex].Path, &g.opts) + g.fileIndex++ + g.sts.Files = int64(g.fileIndex) + + return err +} + +func (g *GlobReader) Read(p []byte) (n int, err error) { + if g.reader == nil { + return 0, io.EOF + } + + g.mu.RLock() + n, err = g.reader.Read(p) + g.mu.RUnlock() + + if err == io.EOF { + err = g.nextFile() + } + return n, err +} + +func (g *GlobReader) Close() error { + return g.reader.Close() +} + +func (g *GlobReader) ReadLine() (b []byte, err error) { + if g.reader == nil { + return b, io.EOF + } + + g.mu.RLock() + b, err = g.reader.ReadLine() + g.mu.RUnlock() + + if err == io.EOF { + err = g.nextFile() + } + return b, err +} + +func (g *GlobReader) Stats() stat.Stats { + sts := g.sts + if g.reader != nil { + s := g.reader.Stats() + sts.ByteCnt += s.ByteCnt + sts.LineCnt += s.LineCnt + sts.Size += s.Size + sts.Files++ + } + return sts +} diff --git a/file/nop/read.go b/file/nop/read.go index fc95a429..25d554b2 100644 --- a/file/nop/read.go +++ b/file/nop/read.go @@ -175,6 +175,9 @@ func ListFiles(pth string) ([]stat.Stats, error) { if errMode != nil && errMode.Host == "err" { return nil, errors.New(errMode.Host) } + if pth[len(pth)-1] == '/' { + pth += "file.txt" + } sts.SetPath(pth) return []stat.Stats{sts}, nil diff --git a/file/stat/stat.go b/file/stat/stat.go index 1d6670e5..5a94960d 100644 --- a/file/stat/stat.go +++ b/file/stat/stat.go @@ -58,6 +58,8 @@ type Stats struct { IsDir bool `json:"-"` + Files int64 `json:"-"` + mu sync.Mutex } diff --git a/go.mod b/go.mod index fd0289a6..462c960d 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,9 @@ require ( github.com/google/uuid v1.1.2 github.com/hydronica/toml v0.4.1 github.com/hydronica/trial v0.5.0 + github.com/itchyny/gojq v0.12.0 github.com/jarcoal/httpmock v1.0.6 - github.com/jbsmith7741/go-tools v0.4.0 + github.com/jbsmith7741/go-tools v0.4.1 github.com/jbsmith7741/trial v0.3.1 github.com/jbsmith7741/uri v0.4.1 github.com/jmoiron/sqlx v1.2.0 diff --git a/go.sum b/go.sum index 954ea923..fc26da0b 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -236,12 +237,21 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/itchyny/astgen-go v0.0.0-20200815150004-12a293722290 h1:9ZAJ5+eh9dfcPsJ1CXoiE16JzsBmJm1e124eUkXAyc0= +github.com/itchyny/astgen-go v0.0.0-20200815150004-12a293722290/go.mod h1:296z3W7Xsrp2mlIY88ruDKscuvrkL6zXCNRtaYVshzw= +github.com/itchyny/go-flags v1.5.0/go.mod h1:lenkYuCobuxLBAd/HGFE4LRoW8D3B6iXRQfWYJ+MNbA= +github.com/itchyny/gojq v0.12.0 h1:Gv367aLowY1uIoL1bP87h5ARY1bKMB5O6KBEqHK9mq8= +github.com/itchyny/gojq v0.12.0/go.mod h1:gIO0gJG9sCJ8fOJwF65n/nqKbVhvPtP8N+RjbmoixAY= +github.com/itchyny/timefmt-go v0.1.1 h1:rLpnm9xxb39PEEVzO0n4IRp0q6/RmBc7Dy/rE4HrA0U= +github.com/itchyny/timefmt-go v0.1.1/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A= github.com/jarcoal/httpmock v1.0.5/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= github.com/jarcoal/httpmock v1.0.6 h1:e81vOSexXU3mJuJ4l//geOmKIt+Vkxerk1feQBC8D0g= github.com/jarcoal/httpmock v1.0.6/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= github.com/jbsmith7741/go-tools v0.2.0/go.mod h1:UIlRAZ6aTSLlVI+owNLWbVS9e/Rywzahvurv/PUVBKg= github.com/jbsmith7741/go-tools v0.4.0 h1:UpKLVkPQRZEvxec110Nh0f+GeOiRSHo5oCpw/eD3AHs= github.com/jbsmith7741/go-tools v0.4.0/go.mod h1:8v8ffjiI3qOs6epawzxmPB7AOKoNNxZHKPl2VUWXoyY= +github.com/jbsmith7741/go-tools v0.4.1 h1:qqHI0imezT1SLCd43df4sBZ44b4bndfo0W9R9B53/kA= +github.com/jbsmith7741/go-tools v0.4.1/go.mod h1:8v8ffjiI3qOs6epawzxmPB7AOKoNNxZHKPl2VUWXoyY= github.com/jbsmith7741/trial v0.3.1 h1:JZ0/w3lhfH4iacf9R2DnZWtTMa/Uf4O13gnuMLTub/M= github.com/jbsmith7741/trial v0.3.1/go.mod h1:M4FQWUgVpPY2+i53L2nSB0AyPc86kSTIigcr9Q7XQlY= github.com/jbsmith7741/uri v0.4.0/go.mod h1:Ctt8YJ5gCFx5BX/FMFg5VkwuI9buBcvsITIiSMH+TeA= @@ -282,9 +292,15 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -540,6 +556,7 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -550,11 +567,13 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -764,6 +783,8 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=