Skip to content

Commit

Permalink
Sql read (#124)
Browse files Browse the repository at this point in the history
* first draft of sql_read worker

* add unit tests

* add postgres datatypes

* simplify read logic

* fix unreliable unit test
  • Loading branch information
jbsmith7741 authored Aug 5, 2020
1 parent 2721991 commit 460c8a4
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 3 deletions.
2 changes: 1 addition & 1 deletion apps/workers/sql_load/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
w.dbDriver, w.Params.Table, w.records)
}

// Queries the database for the table schema for each column
// QuerySchema queries the database for the table schema for each column
// sets the worker's db value
func (w *worker) QuerySchema() (err error) {
var t, s string // table and schema
Expand Down
99 changes: 99 additions & 0 deletions apps/workers/sql_read/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"fmt"
"log"
"strings"

"github.com/jbsmith7741/go-tools/appenderr"
"github.com/jmoiron/sqlx"

tools "github.com/pcelvng/task-tools"
"github.com/pcelvng/task-tools/bootstrap"
"github.com/pcelvng/task-tools/file"
)

const (
taskType = "sql_read"
desc = ``
)

type options struct {
DBOptions `toml:"mysql"`

FOpts *file.Options `toml:"file"`
db *sqlx.DB
}

type DBOptions struct {
Type string `toml:"type" commented:"true"`
Username string `toml:"username" commented:"true"`
Password string `toml:"password" commented:"true"`
Host string `toml:"host" comment:"host can be 'host:port', 'host', 'host:' or ':port'"`
DBName string `toml:"dbname"`
}

func (o *options) Validate() error {
errs := appenderr.New()
if o.Host == "" {
errs.Addf("missing db host")
}
if o.DBName == "" {
errs.Addf("missing db name")
}
return errs.ErrOrNil()
}

// connectDB creates a connection to the database
func (o *options) connectDB() (err error) {
var dsn string
switch o.Type {
case "mysql":
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true", o.Username, o.Password, o.Host, o.DBName)
case "postgres":
host, port := o.Host, ""
if v := strings.Split(o.Host, ":"); len(v) > 1 {
host, port = v[0], v[1]
}

dsn = fmt.Sprintf("host=%s dbname=%s sslmode=disable", host, o.DBName)
if o.Username != "" {
dsn += " user=" + o.Username
}
if o.Password != "" {
dsn += " password=" + o.Password
}
if port != "" {
dsn += " port=" + port
}
default:
return fmt.Errorf("unknown db type %s", o.Type)
}
o.db, err = sqlx.Open("mysql", dsn)
return err
}

func main() {
opts := &options{
FOpts: file.NewOptions(),
DBOptions: DBOptions{
Type: "mysql",
Username: "user",
Password: "pass",
Host: "127.0.0.1:3306",
DBName: "db",
},
}
app := bootstrap.NewWorkerApp(taskType, opts.NewWorker, opts).
Description(desc).
Version(tools.Version)

app.Initialize()

// setup database connection
if err := opts.connectDB(); err != nil {
log.Fatal("db connect", err)
}

app.Run()
}
186 changes: 186 additions & 0 deletions apps/workers/sql_read/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"context"
"fmt"
"io/ioutil"
"strings"

"github.com/dustin/go-humanize"
_ "github.com/go-sql-driver/mysql"
"github.com/jbsmith7741/uri"
"github.com/jmoiron/sqlx"
jsoniter "github.com/json-iterator/go"
"github.com/pcelvng/task"
"github.com/pcelvng/task-tools/file"
)

type worker struct {
task.Meta

db *sqlx.DB
writer file.Writer

Fields FieldMap
Query string
}

type FieldMap map[string]string

func (o *options) NewWorker(info string) task.Worker {
// unmarshal info string
iOpts := struct {
Table string `uri:"table" required:"true"`
QueryFile string `uri:"origin"` // path to query file
Fields map[string]string `uri:"field"`
Destination string `uri:"dest" required:"true"`
}{}
if err := uri.Unmarshal(info, &iOpts); err != nil {
return task.InvalidWorker(err.Error())
}

var query string
// get query
if len(iOpts.Fields) > 0 {
if s := strings.Split(iOpts.Table, "."); len(s) != 2 {
return task.InvalidWorker("invalid table %s (schema.table)", iOpts.Table)
}
var cols string
for k := range iOpts.Fields {
cols += k + ", "
}
cols = strings.TrimRight(cols, ", ")
query = fmt.Sprintf("select %s from %s", cols, iOpts.Table)
}

if iOpts.QueryFile != "" {
r, err := file.NewReader(iOpts.QueryFile, o.FOpts)
if err != nil {
return task.InvalidWorker(err.Error())
}
b, err := ioutil.ReadAll(r)
if err != nil {
return task.InvalidWorker(err.Error())
}
query = string(b)
}

if query == "" {
return task.InvalidWorker("query path or field params required")
}

w, err := file.NewWriter(iOpts.Destination, o.FOpts)
if err != nil {
return task.InvalidWorker("writer: %s", err)
}

return &worker{
Meta: task.NewMeta(),
db: o.db,
Fields: iOpts.Fields,
Query: query,
writer: w,
}
}

/*
type Field struct {
DataType string
Name string
}
func getTableInfo(db *sqlx.DB, table string) (map[string]*Field, error) {
// pull info about table
s := strings.Split(table, ".")
if len(s) != 2 {
return nil, errors.New("table requires schema and table (schema.table)")
}
rows, err := db.Query("SELECT column_name, data_type\n FROM information_schema.columns WHERE table_schema = ? AND table_name = ?", s[0], s[1])
if err != nil {
return nil, err
}
fields := make(map[string]*Field)
defer rows.Close()
for rows.Next() {
var name, dType string
if err = rows.Scan(&name, &dType); err != nil {
return nil, err
}
if strings.Contains(dType, "char") || strings.Contains(dType, "text") {
dType = "string"
}
if strings.Contains(dType, "int") || strings.Contains(dType, "serial") {
dType = "int"
}
if strings.Contains(dType, "numeric") || strings.Contains(dType, "dec") ||
strings.Contains(dType, "double") || strings.Contains(dType, "real") ||
strings.Contains(dType, "fixed") || strings.Contains(dType, "float") {
dType = "float"
}
fields[name] = &Field{Name: name, DataType: dType}
}
return fields, rows.Close()
} */

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
// pull Data from mysql database
rows, err := w.db.QueryxContext(ctx, w.Query)
if err != nil {
return task.Failed(err)
}
for rows.Next() {
if task.IsDone(ctx) {
w.writer.Abort()
return task.Interrupted()
}
row := make(map[string]interface{})
if err := rows.MapScan(row); err != nil {
return task.Failf("mapscan %s", err)
}

r := w.Fields.convertRow(row)
b, err := jsoniter.Marshal(r)
if err != nil {
return task.Failed(err)
}
if err := w.writer.WriteLine(b); err != nil {
return task.Failed(err)
}
}
if err := rows.Close(); err != nil {
return task.Failed(err)
}

// write to file
if err := w.writer.Close(); err != nil {
return task.Failed(err)
}

sts := w.writer.Stats()
w.SetMeta("file", sts.Path)

return task.Completed("%d rows written to %s (%s)", sts.LineCnt, sts.Path, humanize.Bytes(uint64(sts.ByteCnt)))
}

func (m FieldMap) convertRow(data map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for key, value := range data {
name := m[key]
switch v := value.(type) {
case []byte:
s := string(v)
result[name] = s
default:
result[name] = value
}
}

return result
}
Loading

0 comments on commit 460c8a4

Please sign in to comment.