Skip to content

Commit

Permalink
improve tests, table/column name qualifying, bcp command (#25)
Browse files Browse the repository at this point in the history
* improve tests, table/column name qualifying, bcp command

* update go.mod, bcp changes
  • Loading branch information
flarco authored Oct 14, 2023
1 parent b1136e1 commit 6cef4a5
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
11 changes: 11 additions & 0 deletions cmd/sling/tests/files/test3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"Col1": "--Geschenk-Gutschein 3--",
"Col2": {
"Col3": {
"@Col4": "2",
"Col-5": "G75118_2.jpg"
}
}
}
]
4 changes: 2 additions & 2 deletions cmd/sling/tests/tasks.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ task.12 file:///tmp/ocral_test_1000.csv {} POSTGRES_SSH public.{stream_file_pat
task.13 BIGQUERY select * from public.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {}
task.14 SNOWFLAKE select * from public.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {}
task.15 CLICKHOUSE select * from default.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {}
task.16 file://tests/files/test1.csv {} POSTGRES public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {}
task.17 file://tests/files/test1.upsert.csv {} POSTGRES public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"}
task.16 file://tests/files/test1.csv {} POSTGRES public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true, "use_bulk": true} {}
task.17 file://tests/files/test1.upsert.csv {} POSTGRES public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true, "use_bulk": true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"}
task.18 file://tests/files/test1.csv {} SNOWFLAKE public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {}
task.19 file://tests/files/test1.upsert.csv {} SNOWFLAKE public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"}
task.20 file://tests/files/test1.csv {} BIGQUERY public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {}
Expand Down
32 changes: 20 additions & 12 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flarco/dbio/filesys"
"github.com/flarco/dbio/iop"
"github.com/flarco/g"
"github.com/samber/lo"
"github.com/spf13/cast"
)

Expand Down Expand Up @@ -100,16 +101,29 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas

targetTable := cfg.Target.Object
if cfg.Target.Options.TableTmp == "" {
cfg.Target.Options.TableTmp = strings.ToLower(tgtConn.Unquote(targetTable))
tableTmp, err := database.ParseTableName(targetTable, tgtConn.GetType())
if err != nil {
return 0, g.Error(err, "no not parse object table name")
}

suffix := lo.Ternary(tgtConn.GetType().DBNameUpperCase(), "_TMP", "_tmp")
if g.In(tgtConn.GetType(), dbio.TypeDbOracle) {
if len(cfg.Target.Options.TableTmp) > 24 {
cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp[:24] // max is 30 chars
if len(tableTmp.Name) > 24 {
tableTmp.Name = tableTmp.Name[:24] // max is 30 chars
}

// some weird column / commit error, not picking up latest columns
cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp + "_tmp" + g.RandString(g.NumericRunes, 1) + strings.ToLower(g.RandString(g.AplhanumericRunes, 1))
} else {
cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp + "_tmp"
suffix2 := g.RandString(g.NumericRunes, 1) + g.RandString(g.AplhanumericRunes, 1)
suffix2 = lo.Ternary(
tgtConn.GetType().DBNameUpperCase(),
strings.ToUpper(suffix2),
strings.ToLower(suffix2),
)
suffix = suffix + suffix2
}

tableTmp.Name = tableTmp.Name + suffix
cfg.Target.Options.TableTmp = tableTmp.FullName()
}
if cfg.Mode == "" {
cfg.Mode = FullRefreshMode
Expand Down Expand Up @@ -170,12 +184,6 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
if adjustColumnType {
df.OnColumnChanged = func(col iop.Column) error {

// sleep to allow transaction to close
// time.Sleep(100 * time.Millisecond)

// df.Context.Lock()
// defer df.Context.Unlock()

table, err := database.ParseTableName(cfg.Target.Options.TableTmp, tgtConn.GetType())
if err != nil {
return g.Error(err, "could not get temp table name for schema change")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.0
github.com/flarco/dbio v0.3.292
github.com/flarco/dbio v0.3.297
github.com/flarco/g v0.1.63
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.3.0
Expand Down
1 change: 1 addition & 0 deletions scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go build -o sling-linux cmd/sling/*.go && chmod +x sling-linux

cat cmd/sling/tests/files/test1.1.csv | ./sling-linux run --tgt-conn POSTGRES --tgt-object public.my_table --mode full-refresh
cat cmd/sling/tests/files/test1.1.csv.gz | ./sling-linux run --tgt-conn POSTGRES --tgt-object public.my_table --mode full-refresh
cat cmd/sling/tests/files/test3.json | ./sling-linux run --src-options "flatten: true" --tgt-conn POSTGRES --tgt-object public.my_table1 --tgt-options 'use_bulk: false' --mode full-refresh
./sling-linux run --src-conn POSTGRES --src-stream public.my_table --stdout > /tmp/my_table.csv
./sling-linux run --src-conn POSTGRES --src-stream public.my_table --tgt-object file:///tmp/my_table.csv
./sling-linux run -r cmd/sling/tests/replications/r.05.yaml
Expand Down

0 comments on commit 6cef4a5

Please sign in to comment.