Skip to content

Commit

Permalink
Merge pull request #198 from slingdata-io/v1.1.12
Browse files Browse the repository at this point in the history
v1.1.12
  • Loading branch information
flarco authored Mar 1, 2024
2 parents 9b7a852 + 6d7a097 commit 7daf0f2
Show file tree
Hide file tree
Showing 22 changed files with 647 additions and 137 deletions.
2 changes: 1 addition & 1 deletion cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func processConns(c *g.CliSC) (ok bool, err error) {
} else {
g.Info("Found %d streams:", len(streamNames))
for _, sn := range streamNames {
println(g.F(" - %s", sn))
env.Println(g.F(" - %s", sn))
}
}

Expand Down
68 changes: 35 additions & 33 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ type testConn struct {
}

var dbConnMap = map[dbio.Type]testConn{
dbio.TypeDbAzure: {name: "azuresql"},
dbio.TypeDbAzureDWH: {name: "azuredwh"},
dbio.TypeDbBigQuery: {name: "bigquery"},
dbio.TypeDbBigTable: {name: "bigtable"},
dbio.TypeDbClickhouse: {name: "clickhouse", schema: "default", useBulk: g.Bool(true)},
dbio.TypeDbDuckDb: {name: "duckdb"},
dbio.TypeDbMariaDB: {name: "mariadb", schema: "mariadb"},
dbio.TypeDbMotherDuck: {name: "motherduck"},
dbio.TypeDbMySQL: {name: "mysql", schema: "mysql"},
dbio.TypeDbOracle: {name: "oracle", schema: "system"},
dbio.TypeDbPostgres: {name: "postgres"},
dbio.TypeDbRedshift: {name: "redshift"},
dbio.TypeDbSnowflake: {name: "snowflake"},
dbio.TypeDbSQLite: {name: "sqlite", schema: "main"},
dbio.TypeDbSQLServer: {name: "mssql", schema: "dbo", useBulk: g.Bool(false)},
dbio.TypeDbStarRocks: {name: "starrocks"},
dbio.TypeDbAzure: {name: "azuresql"},
dbio.TypeDbAzureDWH: {name: "azuredwh"},
dbio.TypeDbBigQuery: {name: "bigquery"},
dbio.TypeDbBigTable: {name: "bigtable"},
dbio.TypeDbClickhouse: {name: "clickhouse", schema: "default", useBulk: g.Bool(true)},
dbio.Type("clickhouse_http"): {name: "clickhouse_http", schema: "default", useBulk: g.Bool(true)},
dbio.TypeDbDuckDb: {name: "duckdb"},
dbio.TypeDbMariaDB: {name: "mariadb", schema: "mariadb"},
dbio.TypeDbMotherDuck: {name: "motherduck"},
dbio.TypeDbMySQL: {name: "mysql", schema: "mysql"},
dbio.TypeDbOracle: {name: "oracle", schema: "system"},
dbio.TypeDbPostgres: {name: "postgres"},
dbio.TypeDbRedshift: {name: "redshift"},
dbio.TypeDbSnowflake: {name: "snowflake"},
dbio.TypeDbSQLite: {name: "sqlite", schema: "main"},
dbio.TypeDbSQLServer: {name: "mssql", schema: "dbo", useBulk: g.Bool(false)},
dbio.TypeDbStarRocks: {name: "starrocks"},
}

func init() {
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestExtract(t *testing.T) {
g.AssertNoError(t, err)
}

func testSuite(dbType dbio.Type, t *testing.T) {
func testSuite(t *testing.T, dbType dbio.Type, connName ...string) {
conn, ok := dbConnMap[dbType]
if !assert.True(t, ok) {
return
Expand Down Expand Up @@ -423,82 +424,83 @@ func runOneTask(t *testing.T, file g.FileItem, dbType dbio.Type) {

func TestSuitePostgres(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbPostgres, t)
testSuite(t, dbio.TypeDbPostgres)
}

// func TestSuiteRedshift(t *testing.T) {
// t.Parallel()
// testSuite(dbio.TypeDbRedshift, t)
// testSuite(t, dbio.TypeDbRedshift)
// }

func TestSuiteStarRocks(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbStarRocks, t)
testSuite(t, dbio.TypeDbStarRocks)
}

func TestSuiteMySQL(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbMySQL, t)
testSuite(t, dbio.TypeDbMySQL)
}

func TestSuiteMariaDB(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbMariaDB, t)
testSuite(t, dbio.TypeDbMariaDB)
}

func TestSuiteOracle(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbOracle, t)
testSuite(t, dbio.TypeDbOracle)
}

// func TestSuiteBigTable(t *testing.T) {
// t.Parallel()
// testSuite(dbio.TypeDbBigTable, t)
// testSuite(t, dbio.TypeDbBigTable)
// }

func TestSuiteBigQuery(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbBigQuery, t)
testSuite(t, dbio.TypeDbBigQuery)
}

func TestSuiteSnowflake(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbSnowflake, t)
testSuite(t, dbio.TypeDbSnowflake)
}

func TestSuiteSQLite(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbSQLite, t)
testSuite(t, dbio.TypeDbSQLite)
}

func TestSuiteDuckDb(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbDuckDb, t)
testSuite(t, dbio.TypeDbDuckDb)
}

// func TestSuiteMotherDuck(t *testing.T) {
// t.Parallel()
// testSuite(dbio.TypeDbMotherDuck, t)
// testSuite(t, dbio.TypeDbMotherDuck)
// }

func TestSuiteSQLServer(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbSQLServer, t)
testSuite(t, dbio.TypeDbSQLServer)
}

// func TestSuiteAzure(t *testing.T) {
// t.Parallel()
// testSuite(dbio.TypeDbAzure, t)
// testSuite(t, dbio.TypeDbAzure)
// }

// func TestSuiteAzureDWH(t *testing.T) {
// t.Parallel()
// testSuite(dbio.TypeDbAzureDWH, t)
// testSuite(t, dbio.TypeDbAzureDWH)
// }

func TestSuiteClickhouse(t *testing.T) {
t.Parallel()
testSuite(dbio.TypeDbClickhouse, t)
testSuite(t, dbio.TypeDbClickhouse)
testSuite(t, dbio.Type("clickhouse_http"))
}

// generate large dataset or use cache
Expand Down
13 changes: 13 additions & 0 deletions core/dbio/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,22 @@ func (c *Connection) setURL() (err error) {

case dbio.TypeDbClickhouse:
setIfMissing("username", c.Data["user"])
setIfMissing("username", "") // clickhouse can work without a user
setIfMissing("password", "")
setIfMissing("schema", c.Data["database"])
setIfMissing("port", c.Type.DefPort())

// parse http url
if httpUrlStr, ok := c.Data["http_url"]; ok {
u, err := url.Parse(cast.ToString(httpUrlStr))
if err != nil {
g.Warn("invalid http_url: %s", err.Error())
} else {
setIfMissing("host", u.Hostname())
}
setIfMissing("database", "default")
}

template = "clickhouse://{username}:{password}@{host}:{port}/{database}"
case dbio.TypeFileSftp:
setIfMissing("password", "")
Expand Down
1 change: 0 additions & 1 deletion core/dbio/connection/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ func (ec *EnvConns) testDiscover(name string, opt DiscoverOptions) (ok bool, str
if len(streamNames) > 0 && conn.Connection.Type.IsFile() &&
folder == "" {
g.Warn("Those are non-recursive folder or file names (at the root level). Please use --folder flag to list sub-folders")
println()
}
} else {
ok = true
Expand Down
42 changes: 34 additions & 8 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type Connection interface {
LoadTemplates() error
MustExec(sql string, args ...interface{}) (result sql.Result)
NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
OptimizeTable(table *Table, columns iop.Columns) (ok bool, err error)
OptimizeTable(table *Table, columns iop.Columns, isTemp ...bool) (ok bool, err error)
Prepare(query string) (stmt *sql.Stmt, err error)
ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
Props() map[string]string
Expand Down Expand Up @@ -640,7 +640,7 @@ func (conn *BaseConn) Connect(timeOut ...int) (err error) {
}
}
}
return g.Error(err, "could not connect to database"+msg)
return g.Error(err, "could not connect to database"+CleanSQL(conn, msg))
}
}

Expand Down Expand Up @@ -1515,9 +1515,9 @@ func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns i
g.M("schema", table.Schema, "table", table.Name),
)
if err != nil {
return columns, g.Error(err, "could not get list of columns for table: "+table.FullName())
return columns, g.Error(err, "could not get list of columns for %s", table.FullName())
} else if len(colData.Rows) == 0 {
return columns, g.Error("did not find any columns for table: " + table.FullName())
return columns, g.Error("did not find any columns for %s. Perhaps it does not exists, or user does not have read permission.", table.FullName())
}

// if fields provided, check if exists in table
Expand Down Expand Up @@ -2365,8 +2365,28 @@ func (conn *BaseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool)
data.InferColumnTypes()
}
columnsDDL := []string{}
columns := data.Columns

for _, col := range data.Columns {
// re-order columns for starrocks (keys first)
if g.In(conn.GetType(), dbio.TypeDbStarRocks) {
orderedColumns := iop.Columns{}

for _, col := range columns {
if col.IsKeyType(iop.PrimaryKey) || col.IsKeyType(iop.DuplicateKey) || col.IsKeyType(iop.HashKey) || col.IsKeyType(iop.AggregateKey) || col.IsKeyType(iop.UniqueKey) {
orderedColumns = append(orderedColumns, col)
}
}

for _, col := range columns {
if !g.In(col.Name, orderedColumns.Names()...) {
orderedColumns = append(orderedColumns, col)
}
}

columns = orderedColumns
}

for _, col := range columns {
// convert from general type to native type
nativeType, err := conn.GetNativeType(col)
if err != nil {
Expand Down Expand Up @@ -2740,7 +2760,7 @@ func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (column
// Hole in this: will truncate data points, since it is based
// only on new data being inserted... would need a complete
// stats of the target table to properly optimize.
func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns) (ok bool, ddlParts []string, err error) {
func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns, isTemp bool) (ok bool, ddlParts []string, err error) {
if missing := table.Columns.GetMissing(newColumns...); len(missing) > 0 {
return false, ddlParts, g.Error("missing columns: %#v\ntable.Columns: %#v\nnewColumns: %#v", missing.Names(), table.Columns.Names(), newColumns.Names())
} else if g.In(conn.GetType(), dbio.TypeDbSQLite) {
Expand Down Expand Up @@ -2777,6 +2797,8 @@ func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Co
newCol.Type = iop.IntegerType
case col.Type == iop.IntegerType && newCol.Type == iop.SmallIntType:
newCol.Type = iop.IntegerType
case isTemp && col.IsString() && newCol.HasNulls() && (newCol.IsDatetime() || newCol.IsNumber() || newCol.IsBool()):
// use new type
case col.Type == iop.TextType || newCol.Type == iop.TextType:
newCol.Type = iop.TextType
default:
Expand Down Expand Up @@ -2904,8 +2926,12 @@ func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Co
return true, ddlParts, nil
}

func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns) (ok bool, err error) {
ok, ddlParts, err := GetOptimizeTableStatements(conn, table, newColumns)
func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error) {
IsTemp := false
if len(isTemp) > 0 {
IsTemp = isTemp[0]
}
ok, ddlParts, err := GetOptimizeTableStatements(conn, table, newColumns, IsTemp)
if err != nil {
return ok, err
}
Expand Down
39 changes: 37 additions & 2 deletions core/dbio/database/database_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path"
"strings"
"sync"
"time"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -38,6 +39,7 @@ type BigQueryConn struct {
DatasetID string
Location string
Datasets []string
Mux sync.Mutex
}

// Init initiates the object
Expand Down Expand Up @@ -296,7 +298,7 @@ func processBQTypeCols(row []interface{}, bqTC *bQTypeCols, ds *iop.Datastream)
var vBR *big.Rat
vBR, ok := row[j].(*big.Rat)
if ok {
row[j], _ = vBR.Float64()
row[j] = vBR.FloatString(9)
}
}
for _, j := range bqTC.datetimeCols {
Expand Down Expand Up @@ -349,7 +351,12 @@ func (conn *BigQueryConn) getItColumns(itSchema bigquery.Schema) (cols iop.Colum
}
cols[i].SetLengthPrecisionScale()

if g.In(field.Type, bigquery.NumericFieldType, bigquery.FloatFieldType) {
if g.In(field.Type, bigquery.NumericFieldType) {
bQTC.numericCols = append(bQTC.numericCols, i)
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
cols[i].DbPrecision = 38
cols[i].DbScale = 9
} else if g.In(field.Type, bigquery.FloatFieldType) {
bQTC.numericCols = append(bQTC.numericCols, i)
cols[i].Sourced = false // need to infer the decimal lengths
} else if field.Type == "DATETIME" || field.Type == bigquery.TimestampFieldType {
Expand Down Expand Up @@ -523,6 +530,28 @@ func getBqSchema(columns iop.Columns) (schema bigquery.Schema) {
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) {
defer df.CleanUp()

// set OnSchemaChange
if df != nil && cast.ToBool(conn.GetProp("adjust_column_type")) {
oldOnColumnChanged := df.OnColumnChanged
df.OnColumnChanged = func(col iop.Column) error {
// prevent any new writers
conn.Mux.Lock()
defer conn.Mux.Unlock()

// wait till all current writers are done
if qs := conn.Context().Wg.Write.GetQueueSize(); qs > 0 {
conn.Context().Wg.Write.Wait()
}

// use pre-defined function
err = oldOnColumnChanged(col)
if err != nil {
return g.Error(err, "could not process ColumnChange for BigQuery")
}
return nil
}
}

if gcBucket := conn.GetProp("GC_BUCKET"); gcBucket == "" {
return conn.importViaLocalStorage(tableFName, df)
}
Expand Down Expand Up @@ -588,8 +617,11 @@ func (conn *BigQueryConn) importViaLocalStorage(tableFName string, df *iop.Dataf
break
}
time.Sleep(2 * time.Second) // max 5 load jobs per 10 secs

conn.Mux.Lock() // to not collide with schema change
conn.Context().Wg.Write.Add()
go copyFromLocal(localFile, table)
conn.Mux.Unlock()
}

conn.Context().Wg.Write.Wait()
Expand Down Expand Up @@ -670,8 +702,11 @@ func (conn *BigQueryConn) importViaGoogleStorage(tableFName string, df *iop.Data
break
}
time.Sleep(2 * time.Second) // max 5 load jobs per 10 secs

conn.Mux.Lock() // to not collide with schema change
conn.Context().Wg.Write.Add()
go copyFromGCS(gcsFile, table)
conn.Mux.Unlock()
}

conn.Context().Wg.Write.Wait()
Expand Down
Loading

0 comments on commit 7daf0f2

Please sign in to comment.