From ffcda64fd18fbcec440648635b590749534a70b1 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Tue, 2 Mar 2021 15:13:50 -0700 Subject: [PATCH] fix lockup on errs on every row (#151) * fix lockup on errs on every row * add default support and unit tests * fix globreader file stats and misc errors --- .gitignore | 2 +- apps/taskmasters/flowlord/flowlord.drawio.svg | 57 +++--- apps/workers/sql-load/main.go | 10 +- apps/workers/sql-load/worker.go | 129 ++++++------- apps/workers/sql-load/worker_test.go | 173 ++++++++++++++---- file/file_test.go | 1 - file/globreader.go | 15 +- 7 files changed, 236 insertions(+), 151 deletions(-) diff --git a/.gitignore b/.gitignore index 6aeeb921..89761305 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ apps/workers/deduper/deduper apps/workers/sort2file/sort2file apps/utils/file-watcher/file-watcher */stats/stats -*/sql-load/sql-load +apps/workers/sql-load/sql-load build coverage \ No newline at end of file diff --git a/apps/taskmasters/flowlord/flowlord.drawio.svg b/apps/taskmasters/flowlord/flowlord.drawio.svg index c0682c10..7b69e646 100644 --- a/apps/taskmasters/flowlord/flowlord.drawio.svg +++ b/apps/taskmasters/flowlord/flowlord.drawio.svg @@ -1,4 +1,4 @@ - + @@ -16,20 +16,20 @@
-
+
workflows/
- + workfl... - + @@ -46,29 +46,6 @@ - - - - - - - - - - -
-
-
- cron -
-
-
-
- - cron - -
-
@@ -89,7 +66,7 @@ - + @@ -139,7 +116,7 @@ - + @@ -158,7 +135,7 @@ - + @@ -192,6 +169,26 @@ + + + + + + + +
+
+
+ cron +
+
+
+
+ + cron + +
+
diff --git a/apps/workers/sql-load/main.go b/apps/workers/sql-load/main.go index 292cb400..a6cf4da5 100644 --- a/apps/workers/sql-load/main.go +++ b/apps/workers/sql-load/main.go @@ -19,7 +19,6 @@ type options struct { MySQL bootstrap.DBOptions `toml:"mysql"` sqlDB *sql.DB - // sqlxDB *sqlx.DB // used for running direct exec command producer bus.Producer fileOpts *file.Options @@ -38,14 +37,15 @@ delete : allows insert into pre-existing data by deleting previous data. - "?delete=date:2020-07-01|id:7" truncate: allows insert into pre-existing table by truncating before insertion fields : allows mapping different json key values to different database column names - - provide a list of field name mapping {json key name}:{DB column name} to be mapped - - ?fields=jsonKey:dbColumnName - + - provide a list of field name mapping {DB column name}:{json key name} to be mapped + - ?fields=dbColumnName:jsonkey +cached_insert: improves insert times by caching data into a temp table +batch_size: (default:1000) number of rows to insert at a time (higher number increases memory usage) Example task: {"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7"} {"type":"sql_load","info":"gs://bucket/path/of/files/to/load/?table=schema.table_name"} -{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7&fields=jsonKeyValue:dbColumnName"}` +{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7&fields=dbColumnName:jsonKeyValue"}` ) func init() { diff --git a/apps/workers/sql-load/worker.go b/apps/workers/sql-load/worker.go index f8c52577..5692266a 100644 --- a/apps/workers/sql-load/worker.go +++ b/apps/workers/sql-load/worker.go @@ -11,7 +11,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -40,7 +39,8 @@ type worker struct { Params InfoURI - flist []string // list of full path file(s) + //flist []string // list of full path file(s) + fReader file.Reader ds *DataSet // the processing data for loading delQuery string // query statement built from DeleteMap @@ -77,7 +77,6 @@ func (o *options) newWorker(info string) task.Worker { w := &worker{ options: *o, Meta: task.NewMeta(), - flist: make([]string, 0), ds: NewDataSet(), } @@ -85,23 +84,12 @@ func (o *options) newWorker(info string) task.Worker { return task.InvalidWorker("params uri.unmarshal: %v", err) } - f, err := file.Stat(w.Params.FilePath, w.fileOpts) + r, err := file.NewGlobReader(w.Params.FilePath, w.fileOpts) if err != nil { - return task.InvalidWorker("filepath os: %v", err) - } - // app will load one file or a directory of files (only one folder deep) - if f.IsDir { - list, _ := file.List(w.Params.FilePath, w.fileOpts) - for i := range list { - w.flist = append(w.flist, list[i].Path) - } - } else { - w.flist = append(w.flist, w.Params.FilePath) + return task.InvalidWorker("%v", err) } + w.fReader = r - if len(w.flist) == 0 { - return task.InvalidWorker("no files found in path %s", w.Params.FilePath) - } if len(w.Params.DeleteMap) > 0 && w.Params.Truncate { return task.InvalidWorker("truncate can not be used with delete fields") } @@ -125,7 +113,7 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) { rowChan := make(chan Row, 100) w.ds.dbSchema, w.ds.insertCols = PrepareMeta(w.ds.dbSchema, w.Params.FieldsMap) - go w.ds.ReadFiles(ctx, w.flist, w.fileOpts, rowChan, w.Params.SkipErr) + go w.ds.ReadFiles(ctx, w.fReader, rowChan, w.Params.SkipErr) retry := 0 if w.Params.CachedInsert && w.dbDriver == "postgres" { @@ -310,14 +298,14 @@ func (w *worker) QuerySchema() (err error) { // ReadFiles uses a files list and file.Options to read files and process data into a Dataset // it will build the cols and rows for each file -func (ds *DataSet) ReadFiles(ctx context.Context, files []string, fOpts *file.Options, rowChan chan Row, skipErrors bool) { +func (ds *DataSet) ReadFiles(ctx context.Context, files file.Reader, rowChan chan Row, skipErrors bool) { errChan := make(chan error, 20) dataIn := make(chan []byte, 20) - var wg sync.WaitGroup + var activeThreads int32 for i := 0; i < 20; i++ { - wg.Add(1) + activeThreads++ go func() { // unmarshaler - defer wg.Done() + defer func() { atomic.AddInt32(&activeThreads, -1) }() for b := range dataIn { var j Jsondata if e := json.Unmarshal(b, &j); e != nil { @@ -335,59 +323,51 @@ func (ds *DataSet) ReadFiles(ctx context.Context, files []string, fOpts *file.Op }() } - // read each file - for i := range files { - r, err := file.NewReader(files[i], fOpts) // create a new file reader - if err != nil { - ds.err = fmt.Errorf("new reader error %w", err) - break - } - - // read the lines of the file - loop: - for { - select { - case <-ctx.Done(): + // read the lines of the file +loop: + for { + select { + case <-ctx.Done(): + break loop + case err := <-errChan: + if skipErrors { + ds.skipCount++ + log.Println(err) + } else { + ds.err = err break loop - case e := <-errChan: - if skipErrors { - ds.skipCount++ - log.Println(e) - } else { - ds.err = e + } + default: + line, err := files.ReadLine() + if err != nil { + if err == io.EOF { break loop } - default: - line, e := r.ReadLine() - if e != nil { - if e == io.EOF { - break loop - } - errChan <- fmt.Errorf("readline error %v - %w", r.Stats().Path, err) - continue - } - dataIn <- line + errChan <- fmt.Errorf("readline error %v - %w", files.Stats().Path, err) + continue } + dataIn <- line } - - log.Println("processed file", r.Stats().Path) - r.Close() // close the reader - if ds.err != nil { - break - } // readline - } // read file + } + files.Close() // close the reader + sts := files.Stats() + log.Printf("processed %d files at %s", sts.Files, sts.Path) close(dataIn) - wg.Wait() - select { - case e := <-errChan: - if skipErrors { - log.Println(e) - ds.skipCount++ - } else { - ds.err = e + for { + select { + case e := <-errChan: + if skipErrors { + log.Println(e) + ds.skipCount++ + } else { + ds.err = e + } + default: + } + if i := atomic.LoadInt32(&activeThreads); i == 0 { + break } - default: } close(rowChan) close(errChan) @@ -411,6 +391,16 @@ func PrepareMeta(dbSchema []DbColumn, fieldMap map[string]string) (meta []DbColu jKey := k.Name if v := fieldMap[k.Name]; v != "" { jKey = v + if k.Default == nil && !k.Nullable { + var s string + switch k.TypeName { + case "int": + s = "0" + case "float": + s = "0.0" + } + k.Default = &s + } } // skip designated fields if jKey == "-" { @@ -436,7 +426,10 @@ func MakeRow(dbSchema []DbColumn, j Jsondata) (row Row, err error) { for k, f := range dbSchema { v, found := j[f.JsonKey] if !found && !f.Nullable { - return nil, fmt.Errorf("%v is required", f.JsonKey) + if f.Default == nil { + return nil, fmt.Errorf("%v is required", f.JsonKey) + } + j[f.JsonKey] = *f.Default } switch x := v.(type) { case string: diff --git a/apps/workers/sql-load/worker_test.go b/apps/workers/sql-load/worker_test.go index f3632863..7d0edc90 100644 --- a/apps/workers/sql-load/worker_test.go +++ b/apps/workers/sql-load/worker_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "os" "path/filepath" @@ -11,6 +12,7 @@ import ( "github.com/hydronica/trial" "github.com/pcelvng/task" "github.com/pcelvng/task-tools/file" + "github.com/pcelvng/task-tools/file/mock" ) func TestDefaultUpdate(t *testing.T) { @@ -64,8 +66,8 @@ func TestPrepareMeta(t *testing.T) { }, Expected: output{ schema: []DbColumn{ - {Name: "C1", JsonKey: "J1"}, - {Name: "C2", JsonKey: "J2"}, + {Name: "C1", JsonKey: "J1", Default: trial.StringP("")}, + {Name: "C2", JsonKey: "J2", Default: trial.StringP("")}, }, columns: []string{"C1", "C2"}, }, @@ -73,14 +75,14 @@ func TestPrepareMeta(t *testing.T) { "Partial json mapping": { Input: input{ schema: []DbColumn{ - {Name: "C1"}, {Name: "C2"}, + {Name: "C1", Nullable: true}, {Name: "C2", Nullable: true}, }, fields: map[string]string{"C1": "J1", "C3": "J2"}, }, Expected: output{ schema: []DbColumn{ - {Name: "C1", JsonKey: "J1"}, - {Name: "C2", JsonKey: "C2"}, + {Name: "C1", JsonKey: "J1", Nullable: true}, + {Name: "C2", JsonKey: "C2", Nullable: true}, }, columns: []string{"C1", "C2"}, }, @@ -113,6 +115,24 @@ func TestPrepareMeta(t *testing.T) { columns: []string{"C1", "C3"}, }, }, + "add defaults when in fieldMap": { + Input: input{ + schema: []DbColumn{ + {Name: "id", Nullable: false, TypeName: "int"}, + {Name: "name", Nullable: false, TypeName: "string"}, + {Name: "value", Nullable: false, TypeName: "float"}, + }, + fields: map[string]string{"id": "json_id", "name": "jName", "value": "jvalue"}, + }, + Expected: output{ + schema: []DbColumn{ + {Name: "id", JsonKey: "json_id", Default: trial.StringP("0"), Nullable: false, TypeName: "int"}, + {Name: "name", JsonKey: "jName", Default: trial.StringP(""), Nullable: false, TypeName: "string"}, + {Name: "value", JsonKey: "jvalue", Default: trial.StringP("0.0"), Nullable: false, TypeName: "float"}, + }, + columns: []string{"id", "name", "value"}, + }, + }, } trial.New(fn, cases).Test(t) @@ -122,7 +142,7 @@ func TestMakeRow(t *testing.T) { schema := []DbColumn{ {Name: "id", JsonKey: "id"}, {Name: "name", JsonKey: "name", Nullable: true}, - {Name: "count", JsonKey: "count", TypeName: "int", Nullable: true}, + {Name: "count", JsonKey: "count", TypeName: "int", Default: trial.StringP("0")}, {Name: "percent", JsonKey: "percent", TypeName: "float", Nullable: true}, {Name: "num", JsonKey: "num", TypeName: "int", Nullable: true}, } @@ -162,9 +182,10 @@ func TestMakeRow(t *testing.T) { }, "nulls": { Input: map[string]interface{}{ - "id": "1234", + "id": "1234", + "count": "10", }, - Expected: Row{"1234", nil, nil, nil, nil}, + Expected: Row{"1234", nil, int64(10), nil, nil}, }, "missing required": { Input: map[string]interface{}{}, @@ -177,6 +198,12 @@ func TestMakeRow(t *testing.T) { }, ExpectedErr: errors.New("cannot convert number"), }, + "defaults": { + Input: map[string]interface{}{ + "id": "1234", + }, + Expected: Row{"1234", nil, "0", nil, nil}, + }, } trial.New(fn, cases).SubTest(t) } @@ -189,8 +216,6 @@ func TestNewWorker(t *testing.T) { type output struct { Params InfoURI - Invalid bool - Msg string Count int DeleteStmt string } @@ -206,7 +231,9 @@ func TestNewWorker(t *testing.T) { w.WriteLine([]byte(`{"test":"value1","testing":"value2","number":123}`)) w.Close() - d1, _ := filepath.Abs(f2) + s, _ := filepath.Abs(f2) + d1, _ := filepath.Split(s) + d1 += "*" f3 := "./tmp1" os.Mkdir(f3, 0755) @@ -218,19 +245,20 @@ func TestNewWorker(t *testing.T) { wrkr := i.options.newWorker(i.Info) o := output{} // if task is invalid set values - o.Invalid, o.Msg = task.IsInvalidWorker(wrkr) + if invalid, msg := task.IsInvalidWorker(wrkr); invalid { + return nil, errors.New(msg) + } // if the test isn't for a invalid worker set count and params - if !o.Invalid { - myw := wrkr.(*worker) - o.Params = myw.Params - o.Count = len(myw.flist) - o.DeleteStmt = myw.delQuery + myw := wrkr.(*worker) + o.Params = myw.Params + if myw.fReader != nil { + o.Count = int(myw.fReader.Stats().Files) } + o.DeleteStmt = myw.delQuery return o, nil } - // testing cases cases := trial.Cases{ "valid_worker": { @@ -241,36 +269,23 @@ func TestNewWorker(t *testing.T) { Table: "schema.table_name", BatchSize: 1000, }, - Invalid: false, - Msg: "", - Count: 1, + Count: 2, }, }, "table_required": { - Input: input{options: &options{}, Info: "nothing"}, - Expected: output{ - Invalid: true, - Msg: "params uri.unmarshal: table is required", - }, + Input: input{options: &options{}, Info: "nothing"}, + ExpectedErr: errors.New("params uri.unmarshal: table is required"), }, "invalid_path": { - Input: input{options: &options{}, Info: "missingfile.json?table=schema.table_name"}, - Expected: output{ - Invalid: true, - Msg: "filepath os: stat missingfile.json: no such file or directory", - }, + Input: input{options: &options{}, Info: "missingfile.json?table=schema.table_name"}, + ExpectedErr: errors.New("no files found for missingfile.json"), }, "invalid_worker": { - Input: input{options: &options{}, Info: d2 + "?table=schema.table_name"}, - Expected: output{ - Params: InfoURI{}, - Invalid: true, - Msg: "no files found in path " + d2, - Count: 0, - }, + Input: input{options: &options{}, Info: d2 + "?table=schema.table_name"}, + ExpectedErr: errors.New("no files found for " + d2), }, "valid_path_with_delete": { @@ -283,8 +298,7 @@ func TestNewWorker(t *testing.T) { DeleteMap: map[string]string{"date(hour_utc)": "2020-07-09", "id": "1572", "amt": "65.2154"}, }, DeleteStmt: "delete from schema.table_name where amt = 65.2154 and date(hour_utc) = '2020-07-09' and id = 1572", - Invalid: false, - Count: 1, + Count: 2, }, }, } @@ -375,3 +389,82 @@ func TestCreateInserts(t *testing.T) { } trial.New(fn, cases).Timeout(5 * time.Second).SubTest(t) } + +func TestReadFiles(t *testing.T) { + c := trial.CaptureLog() + defer c.ReadAll() + + type input struct { + lines []string + skipErrors bool + } + type out struct { + rowCount int32 + skipCount int + } + fn := func(in trial.Input) (interface{}, error) { + i := in.Interface().(input) + ds := DataSet{ + dbSchema: []DbColumn{ + {Name: "id", JsonKey: "id"}, + {Name: "name", JsonKey: "name", Nullable: true}, + {Name: "count", JsonKey: "count", TypeName: "int", Nullable: true}}, + } + reader := mock.NewReader("nop").AddLines(i.lines...) + + rowChan := make(chan Row) + doneChan := make(chan struct{}) + go func() { + for range rowChan { + } + close(doneChan) + }() + ds.ReadFiles(context.Background(), reader, rowChan, i.skipErrors) + <-doneChan + return out{rowCount: ds.rowCount, skipCount: ds.skipCount}, ds.err // number of rows or error + } + cases := trial.Cases{ + "valid data": { + Input: input{ + lines: []string{ + `{"id":1, "name":"apple", "count":10}`, + `{"id":1, "name":"banana", "count":3}`, + }, + }, + Expected: out{rowCount: 2}, + }, + "invalid row": { + Input: input{ + lines: []string{ + `"id":1, "name":"apple", "count":10}`, + `{"id":1, "name":"banana", "count":3}`, + }, + }, + ShouldErr: true, + }, + "all invalid": { + Input: input{ + lines: []string{ + `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, + `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, + `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, `{`, + }, + }, + ShouldErr: true, + }, + "skip invalids": { + Input: input{ + lines: []string{ + `{`, `{`, `{`, `{`, `{`, + `{"id":1, "name":"apple", "count":10}`, + `{"id":1, "name":"banana", "count":3}`, + `{"id":1, "name":"apple", "count":10}`, + `{"id":1, "name":"banana", "count":3}`, + }, + skipErrors: true, + }, + Expected: out{skipCount: 5, rowCount: 4}, + }, + } + trial.New(fn, cases).Timeout(5 * time.Second).SubTest(t) +} diff --git a/file/file_test.go b/file/file_test.go index 149a9a29..8f450faf 100644 --- a/file/file_test.go +++ b/file/file_test.go @@ -121,7 +121,6 @@ func TestGlob_Local(t *testing.T) { }, } trial.New(fn, cases).SubTest(t) - } func TestGlob_S3(t *testing.T) { diff --git a/file/globreader.go b/file/globreader.go index 73d32f97..4a3d5c1b 100644 --- a/file/globreader.go +++ b/file/globreader.go @@ -11,18 +11,20 @@ import ( func NewGlobReader(path string, opts *Options) (_ Reader, err error) { r := &GlobReader{ path: path, - opts: *opts, sts: stat.Stats{ Path: path, }, } + if opts != nil { + r.opts = *opts + } if r.files, err = Glob(path, opts); err != nil { return nil, err } if err := r.nextFile(); err != nil { - return nil, fmt.Errorf("no files (%d) found for %s", len(r.files), path) + return nil, fmt.Errorf("no files found for %s", path) } - + r.sts.Files = int64(len(r.files)) return r, nil } @@ -54,7 +56,6 @@ func (g *GlobReader) nextFile() (err error) { } g.reader, err = NewReader(g.files[g.fileIndex].Path, &g.opts) g.fileIndex++ - g.sts.Files = int64(g.fileIndex) return err } @@ -75,7 +76,10 @@ func (g *GlobReader) Read(p []byte) (n int, err error) { } func (g *GlobReader) Close() error { - return g.reader.Close() + if g.reader != nil { + return g.reader.Close() + } + return nil } func (g *GlobReader) ReadLine() (b []byte, err error) { @@ -100,7 +104,6 @@ func (g *GlobReader) Stats() stat.Stats { sts.ByteCnt += s.ByteCnt sts.LineCnt += s.LineCnt sts.Size += s.Size - sts.Files++ } return sts }