Skip to content

Commit

Permalink
[Snowflake] Clean up staging files after a failed COPY INTO (#1069)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 2, 2024
1 parent de3f88e commit 0a635d1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
6 changes: 6 additions & 0 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dialect

import (
"fmt"
"path/filepath"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
Expand Down Expand Up @@ -152,3 +153,8 @@ FROM
WHERE
UPPER(table_schema) = UPPER(?) AND table_name ILIKE ?`, dbName), []any{schemaName, "%" + constants.ArtiePrefix + "%"}
}

func (SnowflakeDialect) BuildRemoveFilesFromStage(stageName string, path string) string {
// https://docs.snowflake.com/en/sql-reference/sql/remove
return fmt.Sprintf("REMOVE @%s", filepath.Join(stageName, path))
}
17 changes: 17 additions & 0 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,20 @@ WHEN MATCHED AND stg."__ARTIE_DELETE" THEN DELETE
WHEN MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN UPDATE SET "GROUP"=stg."GROUP","ID"=stg."ID","START"=stg."START","UPDATED_AT"=stg."UPDATED_AT"
WHEN NOT MATCHED AND IFNULL(stg."__ARTIE_DELETE", false) = false THEN INSERT ("GROUP","ID","START","UPDATED_AT") VALUES (stg."GROUP",stg."ID",stg."START",stg."UPDATED_AT");`, statements[0])
}

func TestSnowflakeDialect_BuildRemoveAllFilesFromStage(t *testing.T) {
{
// Stage name only, no path
assert.Equal(t,
"REMOVE @STAGE_NAME",
SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", ""),
)
}
{
// Stage name and path
assert.Equal(t,
"REMOVE @STAGE_NAME/path1/subpath2",
SnowflakeDialect{}.BuildRemoveFilesFromStage("STAGE_NAME", "path1/subpath2"),
)
}
}
9 changes: 9 additions & 0 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
}

if _, err = s.Exec(copyCommand); err != nil {
// For non-temp tables, we should try to delete the staging file if COPY INTO fails.
// This is because [PURGE = TRUE] will only delete the staging files upon a successful COPY INTO.
// We also only need to do this for non-temp tables because these staging files will linger, since we create a new temporary table per attempt.
if !createTempTable {
if _, deleteErr := s.ExecContext(ctx, s.dialect().BuildRemoveFilesFromStage(tempTableID.FullyQualifiedName(), "")); deleteErr != nil {
slog.Warn("Failed to remove all files from stage", slog.Any("deleteErr", deleteErr))
}
}

return fmt.Errorf("failed to run copy into temporary table: %w", err)
}

Expand Down

0 comments on commit 0a635d1

Please sign in to comment.