From 6cef4a59331dd9f291b2eb8a39634e645fb0105d Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 14 Oct 2023 10:03:03 -0300 Subject: [PATCH] improve tests, table/column name qualifying, bcp command (#25) * improve tests, table/column name qualifying, bcp command * update go.mod, bcp changes --- cmd/sling/tests/files/test3.json | 11 +++++++++++ cmd/sling/tests/tasks.tsv | 4 ++-- core/sling/task_run_write.go | 32 ++++++++++++++++++++------------ go.mod | 2 +- scripts/test.sh | 1 + 5 files changed, 35 insertions(+), 15 deletions(-) create mode 100644 cmd/sling/tests/files/test3.json diff --git a/cmd/sling/tests/files/test3.json b/cmd/sling/tests/files/test3.json new file mode 100644 index 00000000..c76cf11f --- /dev/null +++ b/cmd/sling/tests/files/test3.json @@ -0,0 +1,11 @@ +[ + { + "Col1": "--Geschenk-Gutschein 3--", + "Col2": { + "Col3": { + "@Col4": "2", + "Col-5": "G75118_2.jpg" + } + } + } +] \ No newline at end of file diff --git a/cmd/sling/tests/tasks.tsv b/cmd/sling/tests/tasks.tsv index a6a2905c..b028db97 100644 --- a/cmd/sling/tests/tasks.tsv +++ b/cmd/sling/tests/tasks.tsv @@ -14,8 +14,8 @@ task.12 file:///tmp/ocral_test_1000.csv {} POSTGRES_SSH public.{stream_file_pat task.13 BIGQUERY select * from public.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {} task.14 SNOWFLAKE select * from public.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {} task.15 CLICKHOUSE select * from default.test1k where 1=1 {} POSTGRES public.test1k full-refresh {} {} -task.16 file://tests/files/test1.csv {} POSTGRES public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {} -task.17 file://tests/files/test1.upsert.csv {} POSTGRES public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"} +task.16 file://tests/files/test1.csv {} POSTGRES public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true, "use_bulk": true} {} +task.17 file://tests/files/test1.upsert.csv {} POSTGRES public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true, "use_bulk": true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"} task.18 file://tests/files/test1.csv {} SNOWFLAKE public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {} task.19 file://tests/files/test1.upsert.csv {} SNOWFLAKE public.test1 incremental id create_dt {"adjust_column_type":true, "add_new_columns":true} {} {"validation_file": "file://tests/files/test1.result.csv", "validation_cols": "0,1,2,3,4,6"} task.20 file://tests/files/test1.csv {} BIGQUERY public.test1 full-refresh {"adjust_column_type":true, "add_new_columns":true} {} diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 23607c03..0e18c150 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -14,6 +14,7 @@ import ( "github.com/flarco/dbio/filesys" "github.com/flarco/dbio/iop" "github.com/flarco/g" + "github.com/samber/lo" "github.com/spf13/cast" ) @@ -100,16 +101,29 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas targetTable := cfg.Target.Object if cfg.Target.Options.TableTmp == "" { - cfg.Target.Options.TableTmp = strings.ToLower(tgtConn.Unquote(targetTable)) + tableTmp, err := database.ParseTableName(targetTable, tgtConn.GetType()) + if err != nil { + return 0, g.Error(err, "no not parse object table name") + } + + suffix := lo.Ternary(tgtConn.GetType().DBNameUpperCase(), "_TMP", "_tmp") if g.In(tgtConn.GetType(), dbio.TypeDbOracle) { - if len(cfg.Target.Options.TableTmp) > 24 { - cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp[:24] // max is 30 chars + if len(tableTmp.Name) > 24 { + tableTmp.Name = tableTmp.Name[:24] // max is 30 chars } + // some weird column / commit error, not picking up latest columns - cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp + "_tmp" + g.RandString(g.NumericRunes, 1) + strings.ToLower(g.RandString(g.AplhanumericRunes, 1)) - } else { - cfg.Target.Options.TableTmp = cfg.Target.Options.TableTmp + "_tmp" + suffix2 := g.RandString(g.NumericRunes, 1) + g.RandString(g.AplhanumericRunes, 1) + suffix2 = lo.Ternary( + tgtConn.GetType().DBNameUpperCase(), + strings.ToUpper(suffix2), + strings.ToLower(suffix2), + ) + suffix = suffix + suffix2 } + + tableTmp.Name = tableTmp.Name + suffix + cfg.Target.Options.TableTmp = tableTmp.FullName() } if cfg.Mode == "" { cfg.Mode = FullRefreshMode @@ -170,12 +184,6 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas if adjustColumnType { df.OnColumnChanged = func(col iop.Column) error { - // sleep to allow transaction to close - // time.Sleep(100 * time.Millisecond) - - // df.Context.Lock() - // defer df.Context.Unlock() - table, err := database.ParseTableName(cfg.Target.Options.TableTmp, tgtConn.GetType()) if err != nil { return g.Error(err, "could not get temp table name for schema change") diff --git a/go.mod b/go.mod index 03036dab..f8c734c4 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/denisbrodbeck/machineid v1.0.1 github.com/dustin/go-humanize v1.0.0 - github.com/flarco/dbio v0.3.292 + github.com/flarco/dbio v0.3.297 github.com/flarco/g v0.1.63 github.com/getsentry/sentry-go v0.11.0 github.com/google/uuid v1.3.0 diff --git a/scripts/test.sh b/scripts/test.sh index 03daff5e..7d1c91ba 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -19,6 +19,7 @@ go build -o sling-linux cmd/sling/*.go && chmod +x sling-linux cat cmd/sling/tests/files/test1.1.csv | ./sling-linux run --tgt-conn POSTGRES --tgt-object public.my_table --mode full-refresh cat cmd/sling/tests/files/test1.1.csv.gz | ./sling-linux run --tgt-conn POSTGRES --tgt-object public.my_table --mode full-refresh +cat cmd/sling/tests/files/test3.json | ./sling-linux run --src-options "flatten: true" --tgt-conn POSTGRES --tgt-object public.my_table1 --tgt-options 'use_bulk: false' --mode full-refresh ./sling-linux run --src-conn POSTGRES --src-stream public.my_table --stdout > /tmp/my_table.csv ./sling-linux run --src-conn POSTGRES --src-stream public.my_table --tgt-object file:///tmp/my_table.csv ./sling-linux run -r cmd/sling/tests/replications/r.05.yaml