Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jun 27, 2024
1 parent 39bb051 commit 8e66810
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
42 changes: 24 additions & 18 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/typing/decimal"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
Expand Down Expand Up @@ -124,6 +126,10 @@ func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {
return false
}

func (b BigQueryDialect) IncreaseNumericType(tableID sql.TableIdentifier, column columns.Column, newDetails decimal.DecimalDetails) string {
return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET DATA TYPE %s", tableID.FullyQualifiedName(), b.QuoteIdentifier(column.Name()), newDetails.BigQueryKind())
}

func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

Expand All @@ -142,17 +148,17 @@ func (BigQueryDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, column
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, bd)
func (b BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, b)
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(TO_JSON_STRING(%s) != '{"key":"%s"}', true)`,
colName, constants.ToastUnavailableValuePlaceholder)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)
func (b BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, b)

// BigQuery does not like DISTINCT for JSON columns, so we wrote this instead.
// Error: Column foo of type JSON cannot be used in SELECT DISTINCT
Expand All @@ -163,12 +169,12 @@ func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, pri
)
}

func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd)
func (b BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string {
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, b)

orderColsToIterate := primaryKeysEscaped
if includeArtieUpdatedAt {
orderColsToIterate = append(orderColsToIterate, bd.QuoteIdentifier(constants.UpdateColumnMarker))
orderColsToIterate = append(orderColsToIterate, b.QuoteIdentifier(constants.UpdateColumnMarker))
}

var orderByCols []string
Expand Down Expand Up @@ -205,7 +211,7 @@ func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableId
return parts
}

func (bd BigQueryDialect) BuildMergeQueries(
func (b BigQueryDialect) BuildMergeQueries(
tableID sql.TableIdentifier,
subQuery string,
idempotentKey string,
Expand All @@ -229,13 +235,13 @@ func (bd BigQueryDialect) BuildMergeQueries(

var equalitySQLParts []string
for _, primaryKey := range primaryKeys {
equalitySQL := sql.BuildColumnComparison(primaryKey, constants.TargetAlias, constants.StagingAlias, sql.Equal, bd)
equalitySQL := sql.BuildColumnComparison(primaryKey, constants.TargetAlias, constants.StagingAlias, sql.Equal, b)

if primaryKey.KindDetails.Kind == typing.Struct.Kind {
// BigQuery requires special casting to compare two JSON objects.
equalitySQL = fmt.Sprintf("TO_JSON_STRING(%s) = TO_JSON_STRING(%s)",
sql.QuoteTableAliasColumn(constants.TargetAlias, primaryKey, bd),
sql.QuoteTableAliasColumn(constants.StagingAlias, primaryKey, bd))
sql.QuoteTableAliasColumn(constants.TargetAlias, primaryKey, b),
sql.QuoteTableAliasColumn(constants.StagingAlias, primaryKey, b))
}

equalitySQLParts = append(equalitySQLParts, equalitySQL)
Expand All @@ -254,11 +260,11 @@ MERGE INTO %s %s USING %s AS %s ON %s`,
WHEN MATCHED %sTHEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED %sTHEN UPDATE SET %s
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, b),
// WHEN NOT MATCHED THEN INSERT (%s)
strings.Join(sql.QuoteColumns(cols, bd), ","),
strings.Join(sql.QuoteColumns(cols, b), ","),
// VALUES (%s);
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, bd), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, b), ","),
)}, nil
}

Expand All @@ -273,12 +279,12 @@ WHEN MATCHED AND %s THEN DELETE
WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`,
// WHEN MATCHED AND %s THEN DELETE
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, b),
// WHEN MATCHED AND IFNULL(%s, false) = false %sTHEN UPDATE SET %s
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, bd),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, b), idempotentClause, sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, b),
// WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s)
sql.QuotedDeleteColumnMarker(constants.StagingAlias, bd), strings.Join(sql.QuoteColumns(cols, bd), ","),
sql.QuotedDeleteColumnMarker(constants.StagingAlias, b), strings.Join(sql.QuoteColumns(cols, b), ","),
// VALUES (%s);
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, bd), ","),
strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, b), ","),
)}, nil
}
2 changes: 2 additions & 0 deletions lib/sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
)

type TableIdentifier interface {
Expand All @@ -20,6 +21,7 @@ type Dialect interface {
KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error)
IsColumnAlreadyExistsErr(err error) bool
IsTableDoesNotExistErr(err error) bool
IncreaseNumericType(tableID TableIdentifier, column columns.Column, newDetails decimal.DecimalDetails) string
BuildCreateTableQuery(tableID TableIdentifier, temporary bool, colSQLParts []string) string
BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string
BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string
Expand Down

0 comments on commit 8e66810

Please sign in to comment.