Skip to content

Commit

Permalink
Merge pull request #486 from slingdata-io/v1.3.6
Browse files Browse the repository at this point in the history
v1.3.6
  • Loading branch information
flarco authored Jan 22, 2025
2 parents 79bd269 + f4567f8 commit c112999
Show file tree
Hide file tree
Showing 42 changed files with 2,181 additions and 300 deletions.
5 changes: 1 addition & 4 deletions cmd/sling/sling_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,7 @@ func Track(event string, props ...map[string]interface{}) {
"User-Agent": g.F("sling-cli/%s (%s) %s", core.Version, runtime.GOOS, machineID),
}
body := strings.NewReader(g.Marshal(payload))
resp, respBytes, _ := net.ClientDo(http.MethodPost, env.PlausibleURL, body, h, 5)
if resp != nil {
g.Trace("post event response: %s\n%s", resp.Status, string(respBytes))
}
net.ClientDo(http.MethodPost, env.PlausibleURL, body, h, 5)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/sling/sling_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var SlingMedia = media{}

func (m media) PrintFollowUs() {
choices := []string{
g.F("%s 👉 %s", color.HiGreenString("Follow Sling's Evolution"), color.HiBlueString("https://x.com/SlingDataIO")),
g.F("%s => %s", color.HiGreenString("Follow Sling's Evolution"), color.HiBlueString("https://x.com/SlingDataIO")),
// g.F("%s%s", color.HiGreenString("Follow Sling's Evolution: "), color.HiBlueString("https://linkedin.com/company/slingdata-io")),
}
i := g.RandInt(len(choices))
Expand Down
4 changes: 2 additions & 2 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,11 @@ func replicationRun(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
}

// parse hooks
startHooks, err := replication.ParseDefaultHook(sling.HookStageStart)
startHooks, err := replication.ParseReplicationHook(sling.HookStageStart)
if err != nil {
return g.Error(err, "could not parse start hooks")
}
endHooks, err := replication.ParseDefaultHook(sling.HookStageEnd)
endHooks, err := replication.ParseReplicationHook(sling.HookStageEnd)
if err != nil {
return g.Error(err, "could not parse end hooks")
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ func testSuite(t *testing.T, connType dbio.Type, testSelect ...string) {
streamConfig["sql"] = g.String(streamName)
streamName = testName
}
if where, ok := sourceOptions["where"]; ok {
streamConfig["where"] = where
}

replicationConfig := g.M(
"source", cast.ToString(rec["source_conn"]),
Expand Down
9 changes: 5 additions & 4 deletions cmd/sling/tests/replications/r.09.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ target: sqlite
defaults:
mode: full-refresh
object: 'main.pg_{stream_table}'
sql: "select *, 'hi' there from {stream_name} limit 10"
sql: "select *, 'hi' there from {stream_name} {where_clause} limit 10"

streams:
public.my_table?:
tags: [ my_table ]
where: '2 > 1'
public.my_table:
hooks:
pre:
Expand All @@ -19,15 +20,15 @@ streams:

post:
- type: http
if: run.status = 'success'
if: run.status == "success"
url: https://webhook.ocral.org/status
id: webhook

- type: log
message: runtime_state => '{runtime_state}'

- type: query
if: hooks.webhook.status = success
if: hooks.webhook.status == "success"
connection: '{target.name}'
query: |
select
Expand All @@ -40,4 +41,4 @@ streams:
public.my_table_*:
columns:
there: string | value_len = 3
there: string | value_len == 3
3 changes: 2 additions & 1 deletion cmd/sling/tests/replications/r.16.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ defaults:

streams:
oracle.TEST1K_ORACLE_WIDE:
object: /tmp/test.csv
object: /tmp/test.csv
where: '2 > 1'
65 changes: 65 additions & 0 deletions cmd/sling/tests/replications/r.19.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
source: aws_s3
target: postgres

env:
SLING_STATE: aws_s3/sling-state/test/r.19
RESET: ${RESET}

hooks:
start:
- type: log
message: 'runtime_state => {runtime_state}'

- type: delete
if: env.RESET == "true"
connection: aws_s3
path: sling-state/test/r.19 # delete state on replication start

- type: inspect
id: inspect_file
location: aws_s3/test/public_test1k_postgres_pg_parquet
message: 'runtime_state ==> {runtime_state}'

# test check, should fail and warn
- type: check
id: inspect_file_check
check: hooks.inspect_file.is_dir != true
on_failure: warn

- type: query
connection: postgres
if: env.RESET == "true"
query: 'drop table public.test1k_postgres_pg_parquet'

end:

- type: log
message: |
inspect-output ==> {hooks.inspect_file}
runtime_state ==> {runtime_state}
- type: copy
id: file-copy
from: aws_s3/sling-state/test/r.19
to: local//tmp/test/sling-state/test/r.19

- type: check
check: hooks["file-copy"].bytes_written > 0
on_failure: warn

streams:
test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/:
id: test1k_postgres_pg_parquet
object: public.test1k_postgres_pg_parquet
mode: incremental
primary_key: [id]
update_key: update_dt
source_options:
format: parquet
hooks:
post:
- type: log
message: |
run-output ==> {run}
2 changes: 2 additions & 0 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,5 @@ n test_name rows bytes streams fails output_contains command
64 Run sling project status PROJECT NAME sling project status
65 Run sling project jobs manage project jobs sling project jobs
66 Run sling project jobs list FILE NAME sling project jobs list
67 Run sling hooks & source partitioned (backfill) 551 "executed hook ""start-02"" (type: delete)|writing incremental state (value => 2019-06-01" RESET=true sling run -r cmd/sling/tests/replications/r.19.yaml -d --mode backfill --range 2018-01-01,2019-05-01
68 Run sling hooks & source partitioned (incremental) >78 "skipped hook ""start-02""|hook (inspect_file_check) failed => check failure|writing incremental state (value => 2019-07-01" sling run -r cmd/sling/tests/replications/r.19.yaml -d --streams 'test1k_postgres_pg_parquet'
4 changes: 2 additions & 2 deletions cmd/sling/tests/suite.db.template.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ n test_name source_conn source_stream source_options stream_config target_conn t
1 csv_full_refresh local file://tests/files/test1.csv "{""columns"": {""first_name"": ""string(100)"", ""update_dt"": ""timestampz"", ""email"": ""string(150)""}, ""transforms"": {""update_dt"":[""set_timezone(\""America/New_York\"")""]}}" {} [conn] [schema].[table] full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""pre_sql"" : ""{drop_view}"", ""table_keys"": { ""unique"": [ ""id"" ], ""index"": [ ""code"" ] }}" "{""validation_row_count"": ""1000"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz""}}"
2 csv_full_refresh_delimiter local file://tests/files/test5.csv {} [conn] [schema].[table]_2 full-refresh "{""post_sql"" : ""{drop_view}""}" "{""validation_row_count"": "">0""}"
3 discover_table [conn] [schema].[table] discover "{""validation_contains"": ""create_dt"", ""validation_row_count"": ""11"", ""level"": ""column""}"
4 parquet_snapshot local file://tests/files/test1.parquet {} [conn] [schema].[table]_snapshot snapshot "{""post_sql"" : ""drop table [schema].[table]_2""}" "{""validation_row_count"": "">999"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestampz"",""date"":""date"",""rating"":""decimal"",""target"":""bool""}}"
4 parquet_snapshot local file://tests/files/test1.parquet "{""columns"": {""date"": ""date""}}" [conn] [schema].[table]_snapshot snapshot "{""post_sql"" : ""drop table [schema].[table]_2""}" "{""validation_row_count"": "">999"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestampz"",""date"":""date"",""rating"":""decimal"",""target"":""bool""}}"
5 parquet_truncate local file://tests/files/test1.parquet "{""columns"": {""rating"": ""float""}}" [conn] [schema].[table]_truncate truncate "{""pre_sql"": ""drop table [schema].[table]_snapshot "", ""post_sql"" : ""drop table [schema].[table]_truncate""}"
6 csv_wide_full_refresh local file://tests/files/test.wide.csv "{""limit"": 90}" {} [conn] [schema].[table]_wide full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""table_keys"": { ""unique"": [ ""id"" ] }}" "{""validation_row_count"": ""90""}"
7 table_full_refresh_into_postgres [conn] [schema].[table] {} {} postgres public.[table]_pg full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_row_count"": "">900"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz""}}"
8 table_full_refresh_into_postgres_orig [conn] [schema].[table] {} {} postgres public.[table]_pg_orig full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}"
9 csv_incremental local file://tests/files/test1.upsert.csv "{""columns"": {""first_name"": ""string(100)"", ""email"": ""string(150)""}}" {} [conn] [schema].[table] incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""table_keys"": { ""primary"": [ ""id"" ] }, ""post_sql"" : ""create view [schema].[table]_vw as select * from [schema].[table]""}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0"", ""validation_stream_row_count"": 14, ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz"", ""json_data"":""json""}}"
10 discover_schema [conn] [schema].* discover "{""validation_contains"": ""[table],[table]_wide,[table]_vw"", ""level"": ""table""}"
11 discover_filter [conn] [schema].[table]_v* discover "{""validation_contains"": ""[table]_vw"", ""validation_row_count"": "">0"", ""level"": ""table""}"
12 table_incremental_into_postgres [conn] [schema].[table] "{""limit"": 10000}" {} postgres public.[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_stream_row_count"": "">0"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz"", ""json_data"":""json""}}"
12 table_incremental_into_postgres [conn] [schema].[table] "{""limit"": 10000, ""where"": ""id > -1""}" {} postgres public.[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_stream_row_count"": "">0"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz"", ""json_data"":""json""}}"
13 view_full_refresh_into_postgres [conn] [schema].[table]_vw "{""limit"": 100, ""offset"": 50}" {} postgres public.[table]_pg_vw full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_row_count"": ""100""}"
14 sql_full_refresh_into_postgres [conn] select t1.*, {seq_num} as seq_num from [schema].[table] t1 where 1=1 {} {} postgres public.[table]_pg full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""seq_num"": 123}"
15 sql_incremental_into_postgres [conn] select * from [schema].[table] where {incremental_where_cond} "{""limit"": 10000}" {} postgres public.[table]_pg incremental id update_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", , ""validation_stream_row_count"": "">0""}"
Expand Down
6 changes: 4 additions & 2 deletions core/dbio/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ func (c *Connection) setURL() (err error) {
setIfMissing("bucket", U.U.Host)
}
if c.Type == dbio.TypeFileAzure {
setIfMissing("account", strings.ReplaceAll(U.U.Host, ".blob.core.windows.net", ""))
account := strings.ReplaceAll(U.U.Host, ".blob.core.windows.net", "")
account = strings.ReplaceAll(account, ".dfs.core.windows.net", "")
setIfMissing("account", account)
setIfMissing("container", strings.ReplaceAll(U.U.Path, "/", ""))
}
}
Expand Down Expand Up @@ -729,7 +731,7 @@ func (c *Connection) setURL() (err error) {
return nil
default:
if c.Type.IsUnknown() {
g.Trace("no type detected")
g.Trace("no type detected for %s", c.Name)
}
return nil
}
Expand Down
14 changes: 11 additions & 3 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
_ "github.com/microsoft/go-mssqldb"
_ "github.com/microsoft/go-mssqldb/azuread"
_ "github.com/microsoft/go-mssqldb/integratedauth/krb5"
"github.com/slingdata-io/sling-cli/core/dbio/iop"
_ "github.com/snowflakedb/gosnowflake"
Expand Down Expand Up @@ -331,7 +332,8 @@ func NewConnContext(ctx context.Context, URL string, props ...string) (Connectio
return conn, err
}

func getDriverName(dbType dbio.Type) (driverName string) {
func getDriverName(conn Connection) (driverName string) {
dbType := conn.GetType()
switch dbType {
case dbio.TypeDbPostgres, dbio.TypeDbRedshift:
driverName = "postgres"
Expand All @@ -356,6 +358,12 @@ func getDriverName(dbType dbio.Type) (driverName string) {
default:
driverName = dbType.String()
}

// for custom specified driver
if driver := conn.GetProp("driver"); driver != "" {
return driver
}

return
}

Expand Down Expand Up @@ -629,9 +637,9 @@ func (conn *BaseConn) Connect(timeOut ...int) (err error) {
g.Trace("connURL -> %s", connURL)

if !usePool || !poolOk {
db, err = sqlx.Open(getDriverName(conn.Type), connURL)
db, err = sqlx.Open(getDriverName(conn), connURL)
if err != nil {
return g.Error(err, "Could not connect to DB: "+getDriverName(conn.Type))
return g.Error(err, "Could not connect to DB: "+getDriverName(conn))
}
} else {
conn.SetProp("POOL_USED", cast.ToString(poolOk))
Expand Down
23 changes: 2 additions & 21 deletions core/dbio/database/database_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,30 +337,11 @@ func (conn *DuckDbConn) importViaHTTP(tableFName string, df *iop.Dataflow) (coun
}

func (conn *DuckDbConn) defaultCsvConfig() (config iop.StreamConfig) {
config = iop.DefaultStreamConfig()
config.FileMaxRows = 250000
config.Header = true
config.Delimiter = ","
config.Escape = `"`
config.Quote = `"`
config.NullAs = `\N`
config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout")
return config
return conn.duck.DefaultCsvConfig()
}

func (conn *DuckDbConn) generateCsvColumns(columns iop.Columns) (colStr string) {
// {'FlightDate': 'DATE', 'UniqueCarrier': 'VARCHAR', 'OriginCityName': 'VARCHAR', 'DestCityName': 'VARCHAR'}

colsArr := make([]string, len(columns))
for i, col := range columns {
nativeType, err := conn.GetNativeType(col)
if err != nil {
g.Warn(err.Error())
}
colsArr[i] = g.F("'%s':'%s'", col.Name, nativeType)
}

return "{" + strings.Join(colsArr, ", ") + "}"
return conn.duck.GenerateCsvColumns(columns)
}

// GenerateUpsertSQL generates the upsert SQL
Expand Down
44 changes: 44 additions & 0 deletions core/dbio/database/database_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/dustin/go-humanize"
"github.com/flarco/g"
"github.com/flarco/g/net"
mssql "github.com/microsoft/go-mssqldb"
"github.com/microsoft/go-mssqldb/azuread"
"github.com/slingdata-io/sling-cli/core/dbio/iop"
"github.com/spf13/cast"
"github.com/xo/dburl"
Expand Down Expand Up @@ -79,6 +81,20 @@ func (conn *MsSQLServerConn) GetURL(newURL ...string) string {
return url.String()
}

type SqlServerLogger struct{}

func (l *SqlServerLogger) Printf(format string, v ...any) {
env.Print(g.F(format, v...))
}
func (l *SqlServerLogger) Println(v ...any) {
if len(v) == 1 {
env.Println(cast.ToString(v[0]))
}
if len(v) > 1 {
env.Println(g.F(cast.ToString(v[0]), v...))
}
}

func (conn *MsSQLServerConn) ConnString() string {

propMapping := map[string]string{
Expand Down Expand Up @@ -159,6 +175,9 @@ func (conn *MsSQLServerConn) ConnString() string {
// Azure Active Directory authentication (https://github.com/microsoft/go-mssqldb?tab=readme-ov-file#azure-active-directory-authentication)
"fedauth": "fedauth",
"fed_auth": "fedauth",

"clientcertpath": "clientcertpath",
"client_cert_path": "clientcertpath",
}

U, _ := net.NewURL(conn.GetURL())
Expand All @@ -168,6 +187,31 @@ func (conn *MsSQLServerConn) ConnString() string {
}
}

AdAuthStrings := []string{
// azuread.ActiveDirectoryPassword,
// azuread.ActiveDirectoryIntegrated,
azuread.ActiveDirectoryMSI,
azuread.ActiveDirectoryInteractive,
azuread.ActiveDirectoryDefault,
azuread.ActiveDirectoryManagedIdentity,
azuread.ActiveDirectoryServicePrincipal,
azuread.ActiveDirectoryAzCli,
azuread.ActiveDirectoryDeviceCode,
azuread.ActiveDirectoryApplication,
}
if fedAuth := conn.GetProp("fedauth"); g.In(fedAuth, AdAuthStrings...) {
conn.SetProp("driver", "azuresql")
}

if certPath := os.Getenv("AZURE_CLIENT_CERTIFICATE_PATH"); certPath != "" {
// https://github.com/microsoft/go-sqlcmd/blob/main/pkg/sqlcmd/azure_auth.go#L40
U.SetParam("clientcertpath", certPath)
}

if val := conn.GetProp("log"); val != "" {
mssql.SetLogger(&SqlServerLogger{})
}

return U.String()
}

Expand Down
Loading

0 comments on commit c112999

Please sign in to comment.