Skip to content

Commit

Permalink
Merge pull request #19 from bmeg/table-write-config
Browse files Browse the repository at this point in the history
Table Output Config
  • Loading branch information
kellrott authored Aug 4, 2020
2 parents 41f8a9d + 969b1ed commit c278231
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 11 deletions.
13 changes: 9 additions & 4 deletions emitter/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"fmt"
"sync"
"strings"
"compress/gzip"
"encoding/json"
"encoding/csv"
Expand Down Expand Up @@ -102,13 +103,17 @@ func (s *dirTableEmitter) Close() {
}


func (s *DirEmitter) EmitTable( prefix string, columns []string ) TableEmitter {
path := filepath.Join(s.dir, fmt.Sprintf("%s.table.gz", prefix))
func (s *DirEmitter) EmitTable( name string, columns []string, sep rune ) TableEmitter {
path := filepath.Join(s.dir, name)
te := dirTableEmitter{}
te.handle, _ = os.Create(path)
te.out = gzip.NewWriter(te.handle)
if strings.HasSuffix(name, ".gz") {
te.out = gzip.NewWriter(te.handle)
} else {
te.out = te.handle
}
te.writer = csv.NewWriter(te.out)
te.writer.Comma = '\t'
te.writer.Comma = sep
te.columns = columns
te.writer.Write(te.columns)
return &te
Expand Down
2 changes: 1 addition & 1 deletion emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type TableEmitter interface {

type Emitter interface {
EmitObject(prefix string, objClass string, e map[string]interface{}) error
EmitTable(prefix string, columns []string) TableEmitter
EmitTable(prefix string, columns []string, sep rune) TableEmitter
Close()
}

Expand Down
2 changes: 1 addition & 1 deletion emitter/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *stdTableEmitter) EmitRow(i map[string]interface{}) error {

func (s *stdTableEmitter) Close() {}

func (s StdoutEmitter) EmitTable( prefix string, columns []string ) TableEmitter {
func (s StdoutEmitter) EmitTable( prefix string, columns []string, sep rune ) TableEmitter {
te := stdTableEmitter{columns}
fmt.Printf("%s\n", columns)
return &te
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (run *Runtime) EmitObject(prefix string, c string, o map[string]interface{}
}


func (run *Runtime) EmitTable(prefix string, columns []string) emitter.TableEmitter {
return run.output.EmitTable(prefix, columns)
func (run *Runtime) EmitTable(prefix string, columns []string, sep rune) emitter.TableEmitter {
return run.output.EmitTable(prefix, columns, sep)
}

func (m *Runtime) Printf(s string, x ...interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (m *Task) EmitObject(prefix string, c string, e map[string]interface{}) err
return m.Runtime.EmitObject(prefix, c, e)
}

func (m *Task) EmitTable(prefix string, columns []string) emitter.TableEmitter {
return m.Runtime.EmitTable(prefix, columns)
func (m *Task) EmitTable(prefix string, columns []string, sep rune) emitter.TableEmitter {
return m.Runtime.EmitTable(prefix, columns, sep)
}

func (m *Task) Output(name string, value string) error {
Expand Down
7 changes: 6 additions & 1 deletion transform/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type TableWriteStep struct {
Output string `json:"output" jsonschema_description:"Name of file to create"`
Columns []string `json:"columns" jsonschema_description:"Columns to be written into table file"`
Sep string `json:"sep"`
emit emitter.TableEmitter
}

Expand All @@ -34,7 +35,11 @@ type TableProjectStep struct {
}

func (tw *TableWriteStep) Init(task *pipeline.Task) {
tw.emit = task.Runtime.EmitTable(tw.Output, tw.Columns)
sep := '\t'
if tw.Sep != "" {
sep = rune(tw.Sep[0])
}
tw.emit = task.Runtime.EmitTable(tw.Output, tw.Columns, sep)
}

func (tw *TableWriteStep) Run(i map[string]interface{}, task *pipeline.Task) map[string]interface{} {
Expand Down

0 comments on commit c278231

Please sign in to comment.