diff --git a/cmd/sling/tests/replications/r.17.yaml b/cmd/sling/tests/replications/r.17.yaml index 0b746882..aa980212 100644 --- a/cmd/sling/tests/replications/r.17.yaml +++ b/cmd/sling/tests/replications/r.17.yaml @@ -9,9 +9,10 @@ streams: update_key: update_dt primary_key: id mode: incremental - object: test/{stream_name}/{part_year}/{part_month} + object: test/{stream_name}_{format}/{part_year}/{part_month} target_options: - format: parquet + format: '{format}' env: - SLING_STATE: AWS_S3/state/r.17 \ No newline at end of file + SLING_STATE: AWS_S3/state/r.17/${FORMAT} + format: ${FORMAT} \ No newline at end of file diff --git a/cmd/sling/tests/suite.cli.tsv b/cmd/sling/tests/suite.cli.tsv index f8816e3b..3ac4433d 100644 --- a/cmd/sling/tests/suite.cli.tsv +++ b/cmd/sling/tests/suite.cli.tsv @@ -58,5 +58,6 @@ n test_name rows bytes streams fails output_contains command 57 Run sling with incremental (delete missing soft) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: soft }' 58 Run sling with incremental (delete missing hard) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: hard }' 59 Run sling writing to partitioned parquet (local) 1000 partition_by (|create_dt_year=2018 rm -rf /tmp/sling/output8 ; sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt ; ls -l /tmp/sling/output8 -60 Run sling writing to partitioned parquet (aws) 1002 partition_by ( sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh -61 Run sling with incremental writing to partitioned parquet (aws) 40 partition_by ( sling run -d -r cmd/sling/tests/replications/r.17.yaml +60 Run sling writing to partitioned parquet (aws) 1002 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh +61 Run sling with incremental writing to partitioned parquet (aws) 40 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml +62 Run sling writing to partitioned csv (aws) 1002 partition_by ( FORMAT=csv sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh diff --git a/core/dbio/iop/compression.go b/core/dbio/iop/compression.go index 80b8d948..d4468534 100755 --- a/core/dbio/iop/compression.go +++ b/core/dbio/iop/compression.go @@ -52,6 +52,16 @@ var AllCompressorType = []struct { {ZStandardCompressorType, "ZStandardCompressorType"}, } +// Normalize converts to lowercase +func (ct CompressorType) Normalize() *CompressorType { + return g.Ptr(CompressorType(ct.String())) +} + +// String converts to lowercase +func (ct CompressorType) String() string { + return strings.ToLower(string(ct)) +} + // CompressorTypePtr returns a pointer to the CompressorType value passed in. func CompressorTypePtr(v CompressorType) *CompressorType { return &v diff --git a/core/dbio/iop/duckdb.go b/core/dbio/iop/duckdb.go index 71809263..ac190697 100644 --- a/core/dbio/iop/duckdb.go +++ b/core/dbio/iop/duckdb.go @@ -682,7 +682,8 @@ func (duck *DuckDb) initScanner() { } type DuckDbCopyOptions struct { - Compression string + Format dbio.FileType + Compression CompressorType PartitionFields []PartitionLevel // part_year, part_month, part_day, etc. PartitionKey string WritePartitionCols bool @@ -733,9 +734,16 @@ func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options } // validate compression - options.Compression = strings.ToLower(options.Compression) - if g.In(options.Compression, "", "none", "auto") { - options.Compression = "snappy" + fileExtensionExpr := "" + if options.Format == dbio.FileTypeParquet { + if g.In(options.Compression, "", "none", "auto") { + options.Compression = SnappyCompressorType + } + } + if options.Format == dbio.FileTypeCsv { + if g.In(options.Compression, GzipCompressorType) { + fileExtensionExpr = g.F("file_extension 'csv.gz',") + } } fileSizeBytesExpr := "" @@ -753,22 +761,26 @@ func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options } sql = g.R( - dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet_partitions"), + dbio.TypeDbDuckDb.GetTemplateValue("core.export_to_local_partitions"), "table", fromTable, "local_path", toLocalPath, + "format", string(options.Format), "file_size_bytes_expr", fileSizeBytesExpr, + "file_extension_expr", fileExtensionExpr, "partition_expressions", strings.Join(partSqlExpressions, ", "), "partition_columns", strings.Join(partSqlColumns, ", "), - "compression", cast.ToString(options.Compression), + "compression", string(options.Compression), "write_partition_columns", cast.ToString(options.WritePartitionCols), ) } else { sql = g.R( - dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet"), + dbio.TypeDbDuckDb.GetTemplateValue("core.export_to_local"), "table", fromTable, "local_path", toLocalPath, + "format", string(options.Format), "file_size_bytes_expr", fileSizeBytesExpr, - "compression", cast.ToString(options.Compression), + "file_extension_expr", fileExtensionExpr, + "compression", string(options.Compression), ) } diff --git a/core/dbio/templates/duckdb.yaml b/core/dbio/templates/duckdb.yaml index e399c87c..ec529d4d 100755 --- a/core/dbio/templates/duckdb.yaml +++ b/core/dbio/templates/duckdb.yaml @@ -10,16 +10,16 @@ core: insert_option: "" modify_column: 'alter {column} type {type}' select_stream_scanner: select {fields} from {stream_scanner} {where} - export_parquet: | + export_to_local: | COPY ( select * from {table} ) TO '{local_path}' ( - format 'parquet', {file_size_bytes_expr} + format '{format}', {file_size_bytes_expr} {file_extension_expr} compression '{compression}' ) - export_parquet_partitions: | + export_to_local_partitions: | COPY ( select *, @@ -27,7 +27,7 @@ core: from {table} ) TO '{local_path}' ( - format 'parquet', {file_size_bytes_expr} + format '{format}', {file_size_bytes_expr} {file_extension_expr} compression '{compression}', overwrite true, write_partition_columns {write_partition_columns}, diff --git a/core/sling/config.go b/core/sling/config.go index a7014064..4222cf7d 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -1479,6 +1479,10 @@ func (o *TargetOptions) SetDefaults(targetOptions TargetOptions) { if o.Compression == nil { o.Compression = targetOptions.Compression } + if o.Compression != nil { + o.Compression = o.Compression.Normalize() + } + if o.Format == dbio.FileTypeNone { o.Format = targetOptions.Format } diff --git a/core/sling/task.go b/core/sling/task.go index aed6ed8d..8d6f7760 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -418,9 +418,9 @@ func (t *TaskExecution) Cleanup() { } // shouldWriteViaDuckDB determines whether we should use duckdb -// at the moment, use duckdb only for partitioned target parquet files +// at the moment, use duckdb only for partitioned target parquet or csv files func (t *TaskExecution) shouldWriteViaDuckDB(uri string) bool { - if g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet) { + if g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet, dbio.FileTypeCsv) { return len(iop.ExtractPartitionFields(uri)) > 0 } return false @@ -431,11 +431,16 @@ func (t *TaskExecution) isIncrementalWithUpdateKey() bool { return t.Config.Source.HasUpdateKey() && t.Config.Mode == IncrementalMode } -// isIncrementalStateWithUpdateKey means it has an update_key and is incremental mode via sling state +// isIncrementalStateWithUpdateKey means it has an update_key, with provided sling state and is incremental mode func (t *TaskExecution) isIncrementalStateWithUpdateKey() bool { return os.Getenv("SLING_STATE") != "" && t.isIncrementalWithUpdateKey() } +// hasStateWithUpdateKey means it has an update_key and with provided sling state +func (t *TaskExecution) hasStateWithUpdateKey() bool { + return os.Getenv("SLING_STATE") != "" && t.Config.Source.HasUpdateKey() +} + func (t *TaskExecution) getOptionsMap() (options map[string]any) { options = g.M() g.Unmarshal(g.Marshal(t.Config.Source.Options), &options) diff --git a/core/sling/task_run.go b/core/sling/task_run.go index 3f0dd785..3ab7fd55 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -411,7 +411,7 @@ func (t *TaskExecution) runDbToFile() (err error) { err = g.Error(err, "Error running runDbToFile") } - if cnt > 0 && t.isIncrementalStateWithUpdateKey() { + if cnt > 0 && t.hasStateWithUpdateKey() { if err = setIncrementalValueViaState(t); err != nil { err = g.Error(err, "Could not set incremental value") return err @@ -649,7 +649,7 @@ func (t *TaskExecution) runDbToDb() (err error) { err = g.Error(t.df.Err(), "Error running runDbToDb") } - if cnt > 0 && t.isIncrementalStateWithUpdateKey() { + if cnt > 0 && t.hasStateWithUpdateKey() { if err = setIncrementalValueViaState(t); err != nil { err = g.Error(err, "Could not set incremental value") return err diff --git a/core/sling/task_run_read.go b/core/sling/task_run_read.go index 42ab54ae..f0103809 100644 --- a/core/sling/task_run_read.go +++ b/core/sling/task_run_read.go @@ -69,7 +69,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df selectFieldsStr = strings.Join(fields, ", ") } - if t.isIncrementalWithUpdateKey() || t.Config.Mode == BackfillMode { + if t.isIncrementalWithUpdateKey() || t.hasStateWithUpdateKey() || t.Config.Mode == BackfillMode { // default true value incrementalWhereCond := "1=1" @@ -145,8 +145,8 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df "update_key", srcConn.Quote(cfg.Source.UpdateKey, false), ) } else { - if !(strings.Contains(sTable.SQL, "{incremental_where_cond}") || strings.Contains(sTable.SQL, "{incremental_value}")) { - err = g.Error("Since using incremental/backfill mode + custom SQL, with an `update_key`, the SQL text needs to contain a placeholder: {incremental_where_cond} or {incremental_value}. See https://docs.slingdata.io for help.") + if g.In(t.Config.Mode, IncrementalMode, BackfillMode) && !(strings.Contains(sTable.SQL, "{incremental_where_cond}") || strings.Contains(sTable.SQL, "{incremental_value}")) { + err = g.Error("Since using %s mode + custom SQL, with an `update_key`, the SQL text needs to contain a placeholder: {incremental_where_cond} or {incremental_value}. See https://docs.slingdata.io for help.", t.Config.Mode) return t.df, err } diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 0042a26c..3be8a46a 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -818,7 +818,7 @@ func writeDataflowViaDuckDB(t *TaskExecution, df *iop.Dataflow, fs filesys.FileS // push to temp duck file var duckConn database.Connection - tempTable, _ := database.ParseTableName("main.sling_parquet_temp", dbio.TypeDbDuckDb) + tempTable, _ := database.ParseTableName("main.sling_temp", dbio.TypeDbDuckDb) folder := path.Join(env.GetTempFolder(), "duckdb", g.RandSuffix(tempTable.Name, 3)) defer env.RemoveAllLocalTempFile(folder) @@ -854,7 +854,8 @@ func writeDataflowViaDuckDB(t *TaskExecution, df *iop.Dataflow, fs filesys.FileS duck := duckConn.(*database.DuckDbConn).DuckDb() copyOptions := iop.DuckDbCopyOptions{ - Compression: string(g.PtrVal(t.Config.Target.Options.Compression)), + Format: t.Config.Target.ObjectFileFormat(), + Compression: g.PtrVal(t.Config.Target.Options.Compression), PartitionFields: iop.ExtractPartitionFields(uri), PartitionKey: t.Config.Source.UpdateKey, WritePartitionCols: true,