Skip to content

Commit

Permalink
BigQuery generic loader (#125)
Browse files Browse the repository at this point in the history
* initial bq_load worker

* add truncate, append and delete options

* add app descriptions

* update go modules

* add unit tests

* add lazy column names in sql-read
  • Loading branch information
jbsmith7741 authored Aug 7, 2020
1 parent 460c8a4 commit 51c4fb1
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 94 deletions.
61 changes: 61 additions & 0 deletions apps/workers/bq_load/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"errors"
"strings"

tools "github.com/pcelvng/task-tools"
"github.com/pcelvng/task-tools/bootstrap"
)

const (
taskType = "bq_load"
desc = `load a delimited json file into BigQuery
info params
- origin: (required) file to be loaded (gs://path/file.json)
- destination: (required) project.dataset.table to be insert into
- truncate: truncate the table (delete ALL and insert). Default behavior is to append data
- delete: map field defines the column and values to delete before inserting (delete=id:10|date:2020-01-02)
example
{"task":"bq_load", "info":"gs://my/data.json?destination=project.reports.impressions&delete=date:2020-01-02|id:11"}`
)

type options struct {
BqAuth string `toml:"bq_auth" comment:"file path to service file"`
}

func (o *options) Validate() error {
return nil
}

func main() {
opts := &options{}
app := bootstrap.NewWorkerApp(taskType, opts.NewWorker, opts).
Description(desc).
Version(tools.Version).Initialize()

app.Run()
}

type Destination struct {
Project string
Dataset string
Table string
}

func (d *Destination) UnmarshalText(text []byte) error {
l := strings.Split(string(text), ".")
if len(l) != 3 || len(l[0]) == 0 || len(l[1]) == 0 || len(l[2]) == 0 {
return errors.New("requires (project.dataset.table)")
}

d.Project, d.Dataset, d.Table = l[0], l[1], l[2]
return nil
}

func (d Destination) String() string {
return d.Project + "." + d.Dataset + "." + d.Table
}
119 changes: 119 additions & 0 deletions apps/workers/bq_load/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package main

import (
"context"
"fmt"
"sort"
"strconv"
"strings"

"cloud.google.com/go/bigquery"
"github.com/dustin/go-humanize"
"github.com/jbsmith7741/uri"
"github.com/pcelvng/task"
"google.golang.org/api/option"
)

type worker struct {
task.Meta
options

Destination `uri:"dest_table" required:"true"`
File string `uri:"origin" required:"true"`
Truncate bool `uri:"truncate"`
Append bool `uri:"append"`
DeleteMap map[string]string `uri:"delete"` // will replace the data by removing current data
delete bool
}

func (o *options) NewWorker(info string) task.Worker {
w := &worker{
Meta: task.NewMeta(),
options: *o,
}
err := uri.Unmarshal(info, w)
if err != nil {
return task.InvalidWorker(err.Error())
}

// verify options
w.delete = len(w.DeleteMap) > 0
if w.delete && w.Truncate {
return task.InvalidWorker("truncate and delete options must be selected independently")
}

if !(w.delete || w.Truncate || w.Append) {
return task.InvalidWorker("insert rule required (append|truncate|delete)")
}
if w.delete {
w.Append = true
}

return w
}

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
opts := make([]option.ClientOption, 0)
if w.BqAuth != "" {
opts = append(opts, option.WithCredentialsFile(w.BqAuth))
}
client, err := bigquery.NewClient(ctx, w.Project, opts...)
if err != nil {
return task.Failf("bigquery client init %s", err)
}

bqRef := bigquery.NewGCSReference(w.File)
bqRef.SourceFormat = bigquery.JSON
bqRef.MaxBadRecords = 1

loader := client.Dataset(w.Dataset).Table(w.Table).LoaderFrom(bqRef)
loader.WriteDisposition = bigquery.WriteAppend
if len(w.DeleteMap) > 0 {
q := delStatement(w.DeleteMap, w.Destination)
j, err := client.Query(q).Run(ctx)
if err != nil {
return task.Failf("delete statement: %s", err)
}
status, err := j.Wait(ctx)
if err != nil {
return task.Failf("delete wait: %s", err)
}
if status.Err() != nil {
return task.Failf("delete: %s", err)
}
status = j.LastStatus()
if qSts, ok := status.Statistics.Details.(*bigquery.QueryStatistics); ok {
w.SetMeta("rows_del", strconv.FormatInt(qSts.NumDMLAffectedRows, 10))
}
}

if w.Truncate {
loader.WriteDisposition = bigquery.WriteTruncate
}

job, err := loader.Run(ctx)
if err != nil {
return task.Failf("loader run: %s", err)
}
status, err := job.Wait(ctx)
if err == nil {
if status.Err() != nil {
return task.Failf("job completed with error: %v", status.Err())
}
if sts, ok := status.Statistics.Details.(*bigquery.LoadStatistics); ok {
w.SetMeta("rows_insert", strconv.FormatInt(sts.OutputRows, 10))
return task.Completed("%d rows (%s) loaded", sts.OutputRows, humanize.Bytes(uint64(sts.OutputBytes)))
}
}

return task.Completed("completed")
}

func delStatement(m map[string]string, d Destination) string {
s := make([]string, 0)
for k, v := range m {
s = append(s, k+" = "+v)
}
sort.Sort(sort.StringSlice(s))
return fmt.Sprintf("delete from `%s.%s.%s` where %s", d.Project, d.Dataset, d.Table, strings.Join(s, " and "))
}
69 changes: 69 additions & 0 deletions apps/workers/bq_load/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"errors"
"testing"

"github.com/hydronica/trial"
"github.com/pcelvng/task"
)

func TestNewWorker(t *testing.T) {
fn := func(i trial.Input) (interface{}, error) {
o := &options{}
w := o.NewWorker(i.String())
if invalid, s := task.IsInvalidWorker(w); invalid {
return nil, errors.New(s)
}

return w, nil
}
cases := trial.Cases{
"required fields": {
Input: "",
ExpectedErr: errors.New("origin is required"),
},
"invalid destination": {
Input: "gs://file.json?dest_table=apple",
ExpectedErr: errors.New("requires (project.dataset.table)"),
},
"missing insert rule": {
Input: "gs://file.json?dest_table=p.d.t",
ExpectedErr: errors.New("insert rule required"),
},
"append": {
Input: "gs://file.json?dest_table=p.d.t&append",
Expected: &worker{
Meta: task.NewMeta(),
File: "gs://file.json",
Destination: Destination{"p", "d", "t"},
Append: true,
},
},
"truncate": {
Input: "gs://file.json?dest_table=p.d.t&truncate",
Expected: &worker{
Meta: task.NewMeta(),
File: "gs://file.json",
Destination: Destination{"p", "d", "t"},
Truncate: true,
},
},
"delete": {
Input: "gs://file.json?dest_table=p.d.t&delete=id:10",
Expected: &worker{
Meta: task.NewMeta(),
File: "gs://file.json",
Destination: Destination{"p", "d", "t"},
delete: true,
Append: true,
DeleteMap: map[string]string{"id": "10"},
},
},
"invalid delete": {
Input: "gs://file.json?dest_table=p.d.t&delete=id:10&truncate",
ExpectedErr: errors.New("truncate and delete"),
},
}
trial.New(fn, cases).Test(t)
}
11 changes: 10 additions & 1 deletion apps/workers/sql_read/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ import (

const (
taskType = "sql_read"
desc = ``
desc = `extract data from a mysql or postgres table
info params
- origin: (alternative to field) - path to a file containing a sql query to extract the date
- dest: (required) - file path to where the file should be written
- table: (required with field) - table (schema.table) to read from
- field: (alternative to origin) - list of columns to read from and the json field that should be used to write the values.
example
{"task":"sql_read","info":"?dest=./data.json&table=report.impressions&field=id:my_id|date:date"}`
)

type options struct {
Expand Down
51 changes: 4 additions & 47 deletions apps/workers/sql_read/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type FieldMap map[string]string
func (o *options) NewWorker(info string) task.Worker {
// unmarshal info string
iOpts := struct {
Table string `uri:"table" required:"true"`
Table string `uri:"table"`
QueryFile string `uri:"origin"` // path to query file
Fields map[string]string `uri:"field"`
Destination string `uri:"dest" required:"true"`
Expand Down Expand Up @@ -83,52 +83,6 @@ func (o *options) NewWorker(info string) task.Worker {
}
}

/*
type Field struct {
DataType string
Name string
}
func getTableInfo(db *sqlx.DB, table string) (map[string]*Field, error) {
// pull info about table
s := strings.Split(table, ".")
if len(s) != 2 {
return nil, errors.New("table requires schema and table (schema.table)")
}
rows, err := db.Query("SELECT column_name, data_type\n FROM information_schema.columns WHERE table_schema = ? AND table_name = ?", s[0], s[1])
if err != nil {
return nil, err
}
fields := make(map[string]*Field)
defer rows.Close()
for rows.Next() {
var name, dType string
if err = rows.Scan(&name, &dType); err != nil {
return nil, err
}
if strings.Contains(dType, "char") || strings.Contains(dType, "text") {
dType = "string"
}
if strings.Contains(dType, "int") || strings.Contains(dType, "serial") {
dType = "int"
}
if strings.Contains(dType, "numeric") || strings.Contains(dType, "dec") ||
strings.Contains(dType, "double") || strings.Contains(dType, "real") ||
strings.Contains(dType, "fixed") || strings.Contains(dType, "float") {
dType = "float"
}
fields[name] = &Field{Name: name, DataType: dType}
}
return fields, rows.Close()
} */

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
// pull Data from mysql database
rows, err := w.db.QueryxContext(ctx, w.Query)
Expand Down Expand Up @@ -173,6 +127,9 @@ func (m FieldMap) convertRow(data map[string]interface{}) map[string]interface{}
result := make(map[string]interface{})
for key, value := range data {
name := m[key]
if name == "" {
name = key
}
switch v := value.(type) {
case []byte:
s := string(v)
Expand Down
17 changes: 17 additions & 0 deletions apps/workers/sql_read/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func TestNewWorker(t *testing.T) {
Input: tFile + "?table=schema.table&dest=nop://",
Expected: "select * from fake_table;",
},
"lazy maps": {
Input: "?table=schema.table&dest=nop://&field=id:|name:|value:fruit",
Expected: "select id, name, value from schema.table",
},
"no query": {
Input: "?table=schema.table&dest=nop://",
ShouldErr: true,
Expand Down Expand Up @@ -114,6 +118,19 @@ func TestWorker_DoTask(t *testing.T) {
`{"fruit":"banana"}`,
},
},
"lazy map": {
Input: input{
fields: FieldMap{"id": ""},
Rows: [][]driver.Value{
{1},
{2},
},
},
Expected: []string{
`{"id":1}`,
`{"id":2}`,
},
},
"write fail": {
Input: input{
wPath: "nop://writeline_err",
Expand Down
Loading

0 comments on commit 51c4fb1

Please sign in to comment.