Skip to content

Commit

Permalink
fix lockup on errs on every row (#151)
Browse files Browse the repository at this point in the history
* fix lockup on errs on every row

* add default support and unit tests

* fix globreader file stats and misc errors
  • Loading branch information
jbsmith7741 authored Mar 2, 2021
1 parent 8a2b4a8 commit ffcda64
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 151 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ apps/workers/deduper/deduper
apps/workers/sort2file/sort2file
apps/utils/file-watcher/file-watcher
*/stats/stats
*/sql-load/sql-load
apps/workers/sql-load/sql-load
build

coverage
57 changes: 27 additions & 30 deletions apps/taskmasters/flowlord/flowlord.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions apps/workers/sql-load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type options struct {
MySQL bootstrap.DBOptions `toml:"mysql"`

sqlDB *sql.DB
// sqlxDB *sqlx.DB // used for running direct exec command

producer bus.Producer
fileOpts *file.Options
Expand All @@ -38,14 +37,15 @@ delete : allows insert into pre-existing data by deleting previous data.
- "?delete=date:2020-07-01|id:7"
truncate: allows insert into pre-existing table by truncating before insertion
fields : allows mapping different json key values to different database column names
- provide a list of field name mapping {json key name}:{DB column name} to be mapped
- ?fields=jsonKey:dbColumnName
- provide a list of field name mapping {DB column name}:{json key name} to be mapped
- ?fields=dbColumnName:jsonkey
cached_insert: improves insert times by caching data into a temp table
batch_size: (default:1000) number of rows to insert at a time (higher number increases memory usage)
Example task:
{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7"}
{"type":"sql_load","info":"gs://bucket/path/of/files/to/load/?table=schema.table_name"}
{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7&fields=jsonKeyValue:dbColumnName"}`
{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7&fields=dbColumnName:jsonKeyValue"}`
)

func init() {
Expand Down
Loading

0 comments on commit ffcda64

Please sign in to comment.