Skip to content

Commit

Permalink
partition csv format & optimize sling state handling
Browse files Browse the repository at this point in the history
- updated sling state handling to support update key with incremental mode and sling state
- improved logic for determining whether to use duckdb for writing data
- optimized condition check for incremental state with update key.
  • Loading branch information
flarco committed Dec 24, 2024
1 parent f215231 commit 0ba1b92
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 27 deletions.
7 changes: 4 additions & 3 deletions cmd/sling/tests/replications/r.17.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ streams:
update_key: update_dt
primary_key: id
mode: incremental
object: test/{stream_name}/{part_year}/{part_month}
object: test/{stream_name}_{format}/{part_year}/{part_month}
target_options:
format: parquet
format: '{format}'

env:
SLING_STATE: AWS_S3/state/r.17
SLING_STATE: AWS_S3/state/r.17/${FORMAT}
format: ${FORMAT}
5 changes: 3 additions & 2 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ n test_name rows bytes streams fails output_contains command
57 Run sling with incremental (delete missing soft) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: soft }'
58 Run sling with incremental (delete missing hard) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: hard }'
59 Run sling writing to partitioned parquet (local) 1000 partition_by (|create_dt_year=2018 rm -rf /tmp/sling/output8 ; sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt ; ls -l /tmp/sling/output8
60 Run sling writing to partitioned parquet (aws) 1002 partition_by ( sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh
61 Run sling with incremental writing to partitioned parquet (aws) 40 partition_by ( sling run -d -r cmd/sling/tests/replications/r.17.yaml
60 Run sling writing to partitioned parquet (aws) 1002 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh
61 Run sling with incremental writing to partitioned parquet (aws) 40 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml
62 Run sling writing to partitioned csv (aws) 1002 partition_by ( FORMAT=csv sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh
10 changes: 10 additions & 0 deletions core/dbio/iop/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ var AllCompressorType = []struct {
{ZStandardCompressorType, "ZStandardCompressorType"},
}

// Normalize converts to lowercase
func (ct CompressorType) Normalize() *CompressorType {
return g.Ptr(CompressorType(ct.String()))
}

// String converts to lowercase
func (ct CompressorType) String() string {
return strings.ToLower(string(ct))
}

// CompressorTypePtr returns a pointer to the CompressorType value passed in.
func CompressorTypePtr(v CompressorType) *CompressorType {
return &v
Expand Down
28 changes: 20 additions & 8 deletions core/dbio/iop/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,8 @@ func (duck *DuckDb) initScanner() {
}

type DuckDbCopyOptions struct {
Compression string
Format dbio.FileType
Compression CompressorType
PartitionFields []PartitionLevel // part_year, part_month, part_day, etc.
PartitionKey string
WritePartitionCols bool
Expand Down Expand Up @@ -733,9 +734,16 @@ func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options
}

// validate compression
options.Compression = strings.ToLower(options.Compression)
if g.In(options.Compression, "", "none", "auto") {
options.Compression = "snappy"
fileExtensionExpr := ""
if options.Format == dbio.FileTypeParquet {
if g.In(options.Compression, "", "none", "auto") {
options.Compression = SnappyCompressorType
}
}
if options.Format == dbio.FileTypeCsv {
if g.In(options.Compression, GzipCompressorType) {
fileExtensionExpr = g.F("file_extension 'csv.gz',")
}
}

fileSizeBytesExpr := ""
Expand All @@ -753,22 +761,26 @@ func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options
}

sql = g.R(
dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet_partitions"),
dbio.TypeDbDuckDb.GetTemplateValue("core.export_to_local_partitions"),
"table", fromTable,
"local_path", toLocalPath,
"format", string(options.Format),
"file_size_bytes_expr", fileSizeBytesExpr,
"file_extension_expr", fileExtensionExpr,
"partition_expressions", strings.Join(partSqlExpressions, ", "),
"partition_columns", strings.Join(partSqlColumns, ", "),
"compression", cast.ToString(options.Compression),
"compression", string(options.Compression),
"write_partition_columns", cast.ToString(options.WritePartitionCols),
)
} else {
sql = g.R(
dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet"),
dbio.TypeDbDuckDb.GetTemplateValue("core.export_to_local"),
"table", fromTable,
"local_path", toLocalPath,
"format", string(options.Format),
"file_size_bytes_expr", fileSizeBytesExpr,
"compression", cast.ToString(options.Compression),
"file_extension_expr", fileExtensionExpr,
"compression", string(options.Compression),
)
}

Expand Down
8 changes: 4 additions & 4 deletions core/dbio/templates/duckdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ core:
insert_option: ""
modify_column: 'alter {column} type {type}'
select_stream_scanner: select {fields} from {stream_scanner} {where}
export_parquet: |
export_to_local: |
COPY (
select *
from {table}
) TO '{local_path}'
(
format 'parquet', {file_size_bytes_expr}
format '{format}', {file_size_bytes_expr} {file_extension_expr}
compression '{compression}'
)
export_parquet_partitions: |
export_to_local_partitions: |
COPY (
select
*,
{partition_expressions}
from {table}
) TO '{local_path}'
(
format 'parquet', {file_size_bytes_expr}
format '{format}', {file_size_bytes_expr} {file_extension_expr}
compression '{compression}',
overwrite true,
write_partition_columns {write_partition_columns},
Expand Down
4 changes: 4 additions & 0 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,10 @@ func (o *TargetOptions) SetDefaults(targetOptions TargetOptions) {
if o.Compression == nil {
o.Compression = targetOptions.Compression
}
if o.Compression != nil {
o.Compression = o.Compression.Normalize()
}

if o.Format == dbio.FileTypeNone {
o.Format = targetOptions.Format
}
Expand Down
11 changes: 8 additions & 3 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ func (t *TaskExecution) Cleanup() {
}

// shouldWriteViaDuckDB determines whether we should use duckdb
// at the moment, use duckdb only for partitioned target parquet files
// at the moment, use duckdb only for partitioned target parquet or csv files
func (t *TaskExecution) shouldWriteViaDuckDB(uri string) bool {
if g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet) {
if g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet, dbio.FileTypeCsv) {
return len(iop.ExtractPartitionFields(uri)) > 0
}
return false
Expand All @@ -431,11 +431,16 @@ func (t *TaskExecution) isIncrementalWithUpdateKey() bool {
return t.Config.Source.HasUpdateKey() && t.Config.Mode == IncrementalMode
}

// isIncrementalStateWithUpdateKey means it has an update_key and is incremental mode via sling state
// isIncrementalStateWithUpdateKey means it has an update_key, with provided sling state and is incremental mode
func (t *TaskExecution) isIncrementalStateWithUpdateKey() bool {
return os.Getenv("SLING_STATE") != "" && t.isIncrementalWithUpdateKey()
}

// hasStateWithUpdateKey means it has an update_key and with provided sling state
func (t *TaskExecution) hasStateWithUpdateKey() bool {
return os.Getenv("SLING_STATE") != "" && t.Config.Source.HasUpdateKey()
}

func (t *TaskExecution) getOptionsMap() (options map[string]any) {
options = g.M()
g.Unmarshal(g.Marshal(t.Config.Source.Options), &options)
Expand Down
4 changes: 2 additions & 2 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (t *TaskExecution) runDbToFile() (err error) {
err = g.Error(err, "Error running runDbToFile")
}

if cnt > 0 && t.isIncrementalStateWithUpdateKey() {
if cnt > 0 && t.hasStateWithUpdateKey() {
if err = setIncrementalValueViaState(t); err != nil {
err = g.Error(err, "Could not set incremental value")
return err
Expand Down Expand Up @@ -649,7 +649,7 @@ func (t *TaskExecution) runDbToDb() (err error) {
err = g.Error(t.df.Err(), "Error running runDbToDb")
}

if cnt > 0 && t.isIncrementalStateWithUpdateKey() {
if cnt > 0 && t.hasStateWithUpdateKey() {
if err = setIncrementalValueViaState(t); err != nil {
err = g.Error(err, "Could not set incremental value")
return err
Expand Down
6 changes: 3 additions & 3 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
selectFieldsStr = strings.Join(fields, ", ")
}

if t.isIncrementalWithUpdateKey() || t.Config.Mode == BackfillMode {
if t.isIncrementalWithUpdateKey() || t.hasStateWithUpdateKey() || t.Config.Mode == BackfillMode {
// default true value
incrementalWhereCond := "1=1"

Expand Down Expand Up @@ -145,8 +145,8 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
"update_key", srcConn.Quote(cfg.Source.UpdateKey, false),
)
} else {
if !(strings.Contains(sTable.SQL, "{incremental_where_cond}") || strings.Contains(sTable.SQL, "{incremental_value}")) {
err = g.Error("Since using incremental/backfill mode + custom SQL, with an `update_key`, the SQL text needs to contain a placeholder: {incremental_where_cond} or {incremental_value}. See https://docs.slingdata.io for help.")
if g.In(t.Config.Mode, IncrementalMode, BackfillMode) && !(strings.Contains(sTable.SQL, "{incremental_where_cond}") || strings.Contains(sTable.SQL, "{incremental_value}")) {
err = g.Error("Since using %s mode + custom SQL, with an `update_key`, the SQL text needs to contain a placeholder: {incremental_where_cond} or {incremental_value}. See https://docs.slingdata.io for help.", t.Config.Mode)
return t.df, err
}

Expand Down
5 changes: 3 additions & 2 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func writeDataflowViaDuckDB(t *TaskExecution, df *iop.Dataflow, fs filesys.FileS
// push to temp duck file
var duckConn database.Connection

tempTable, _ := database.ParseTableName("main.sling_parquet_temp", dbio.TypeDbDuckDb)
tempTable, _ := database.ParseTableName("main.sling_temp", dbio.TypeDbDuckDb)
folder := path.Join(env.GetTempFolder(), "duckdb", g.RandSuffix(tempTable.Name, 3))
defer env.RemoveAllLocalTempFile(folder)

Expand Down Expand Up @@ -854,7 +854,8 @@ func writeDataflowViaDuckDB(t *TaskExecution, df *iop.Dataflow, fs filesys.FileS
duck := duckConn.(*database.DuckDbConn).DuckDb()

copyOptions := iop.DuckDbCopyOptions{
Compression: string(g.PtrVal(t.Config.Target.Options.Compression)),
Format: t.Config.Target.ObjectFileFormat(),
Compression: g.PtrVal(t.Config.Target.Options.Compression),
PartitionFields: iop.ExtractPartitionFields(uri),
PartitionKey: t.Config.Source.UpdateKey,
WritePartitionCols: true,
Expand Down

0 comments on commit 0ba1b92

Please sign in to comment.