Skip to content

Commit

Permalink
switch to scanner to avoid empty lines (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbsmith7741 authored Mar 3, 2021
1 parent ffcda64 commit 5b9f125
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
2 changes: 1 addition & 1 deletion apps/workers/sql-load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fields : allows mapping different json key values to different database column n
- provide a list of field name mapping {DB column name}:{json key name} to be mapped
- ?fields=dbColumnName:jsonkey
cached_insert: improves insert times by caching data into a temp table
batch_size: (default:1000) number of rows to insert at a time (higher number increases memory usage)
batch_size: (default:10000) number of rows to insert at a time (higher number increases memory usage)
Example task:
{"type":"sql_load","info":"gs://bucket/path/to/file.json?table=schema.table_name&delete=date:2020-07-01|id:7"}
Expand Down
35 changes: 15 additions & 20 deletions apps/workers/sql-load/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"database/sql"
"fmt"
"io"
"log"
"math/rand"
"sort"
Expand All @@ -23,14 +22,14 @@ import (
)

type InfoURI struct {
FilePath string `uri:"origin"` // file path to load one file or a list of files in that path (not recursive)
Table string `uri:"table" required:"true"` // insert table name i.e., "schema.table_name"
SkipErr bool `uri:"skip_err"` // if bad records are found they are skipped and logged instead of throwing an error
DeleteMap map[string]string `uri:"delete"` // map used to build the delete query statement
FieldsMap map[string]string `uri:"fields"` // map json key values to different db names
Truncate bool `uri:"truncate"` // truncate the table rather than delete
CachedInsert bool `uri:"cached_insert"` // this will attempt to load the query data though a temp table (postgres only)
BatchSize int `uri:"batch_size" default:"1000"` // number of rows to insert at once
FilePath string `uri:"origin"` // file path to load one file or a list of files in that path (not recursive)
Table string `uri:"table" required:"true"` // insert table name i.e., "schema.table_name"
SkipErr bool `uri:"skip_err"` // if bad records are found they are skipped and logged instead of throwing an error
DeleteMap map[string]string `uri:"delete"` // map used to build the delete query statement
FieldsMap map[string]string `uri:"fields"` // map json key values to different db names
Truncate bool `uri:"truncate"` // truncate the table rather than delete
CachedInsert bool `uri:"cached_insert"` // this will attempt to load the query data though a temp table (postgres only)
BatchSize int `uri:"batch_size" default:"10000"` // number of rows to insert at once
}

type worker struct {
Expand Down Expand Up @@ -324,8 +323,9 @@ func (ds *DataSet) ReadFiles(ctx context.Context, files file.Reader, rowChan cha
}

// read the lines of the file
scanner := file.NewScanner(files)
loop:
for {
for scanner.Scan() {
select {
case <-ctx.Done():
break loop
Expand All @@ -338,17 +338,12 @@ loop:
break loop
}
default:
line, err := files.ReadLine()
if err != nil {
if err == io.EOF {
break loop
}
errChan <- fmt.Errorf("readline error %v - %w", files.Stats().Path, err)
continue
}
dataIn <- line
dataIn <- scanner.Bytes()
}
}
if scanner.Err() != nil {
ds.err = scanner.Err()
}
files.Close() // close the reader
sts := files.Stats()
log.Printf("processed %d files at %s", sts.Files, sts.Path)
Expand Down Expand Up @@ -496,7 +491,7 @@ func RandString(n int) string {

func CreateInserts(rowChan chan Row, queryChan chan string, tableName string, cols []string, batchSize int) {
if batchSize < 0 {
batchSize = 1
batchSize = 10000
}

fields := strings.Join(cols, ",")
Expand Down
4 changes: 2 additions & 2 deletions apps/workers/sql-load/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestNewWorker(t *testing.T) {
Params: InfoURI{
FilePath: d1,
Table: "schema.table_name",
BatchSize: 1000,
BatchSize: 10000,
},
Count: 2,
},
Expand All @@ -294,7 +294,7 @@ func TestNewWorker(t *testing.T) {
Params: InfoURI{
FilePath: d1,
Table: "schema.table_name",
BatchSize: 1000,
BatchSize: 10000,
DeleteMap: map[string]string{"date(hour_utc)": "2020-07-09", "id": "1572", "amt": "65.2154"},
},
DeleteStmt: "delete from schema.table_name where amt = 65.2154 and date(hour_utc) = '2020-07-09' and id = 1572",
Expand Down

0 comments on commit 5b9f125

Please sign in to comment.