Skip to content

Commit

Permalink
Support TIMESTAMP_NTZ (#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 17, 2024
1 parent f8d1a64 commit c909d84
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 60 deletions.
6 changes: 5 additions & 1 deletion clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case ext.TimestampNTZKindType:
return "datetime"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
Expand Down Expand Up @@ -103,8 +105,10 @@ func (BigQueryDialect) KindForDataType(rawBqType string, _ string) (typing.KindD
return typing.Struct, nil
case "array":
return typing.Array, nil
case "datetime", "timestamp":
case "timestamp":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
case "datetime":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), nil
case "time":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), nil
case "date":
Expand Down
7 changes: 5 additions & 2 deletions clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTzKindType:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "TIMESTAMP"
case ext.TimestampNTZKindType:
// This is currently in public preview, to use this, the customer will need to enable [timestampNtz] in their delta tables.
// Ref: https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
return "TIMESTAMP_NTZ"
case ext.DateKindType:
return "DATE"
case ext.TimeKindType:
Expand Down Expand Up @@ -73,7 +76,7 @@ func (DatabricksDialect) KindForDataType(rawType string, _ string) (typing.KindD
case "timestamp":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
case "timestamp_ntz":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), nil
}

return typing.Invalid, fmt.Errorf("unsupported data type: %q", rawType)
Expand Down
2 changes: 1 addition & 1 deletion clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDatabricksDialect_KindForDataType(t *testing.T) {
// Timestamp NTZ
kd, err := DatabricksDialect{}.KindForDataType("TIMESTAMP_NTZ", "")
assert.NoError(t, err)
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), kd)
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), kd)
}
{
// Variant
Expand Down
4 changes: 4 additions & 0 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) s
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTzKindType:
return "datetimeoffset"
case ext.TimestampNTZKindType:
// Using datetime2 because it's the recommendation, and it provides more precision: https://stackoverflow.com/a/1884088
return "datetime2"
case ext.DateKindType:
Expand Down Expand Up @@ -114,6 +116,8 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ
case
"datetime",
"datetime2":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), nil
case "datetimeoffset":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
case "time":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), nil
Expand Down
6 changes: 5 additions & 1 deletion clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
switch kd.ExtendedTimeDetails.Type {
case ext.TimestampTzKindType:
return "timestamp with time zone"
case ext.TimestampNTZKindType:
return "timestamp without time zone"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
Expand Down Expand Up @@ -103,7 +105,9 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) (
}, nil
case "double precision":
return typing.Float, nil
case "timestamp with time zone", "timestamp without time zone":
case "timestamp", "timestamp without time zone":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), nil
case "timestamp with time zone":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
case "time without time zone":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), nil
Expand Down
13 changes: 12 additions & 1 deletion clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) {
}
}
}
{
// Timestamps
{
// With timezone
assert.Equal(t, "timestamp with time zone", RedshiftDialect{}.DataTypeForKind(typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), false))
}
{
// Without timezone
assert.Equal(t, "timestamp without time zone", RedshiftDialect{}.DataTypeForKind(typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), false))
}
}
}

func TestRedshiftDialect_KindForDataType(t *testing.T) {
Expand Down Expand Up @@ -130,7 +141,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) {
kd, err := dialect.KindForDataType("timestamp without time zone", "")
assert.NoError(t, err)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
assert.Equal(t, ext.TimestampTzKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type)
}
{
kd, err := dialect.KindForDataType("time without time zone", "")
Expand Down
10 changes: 5 additions & 5 deletions clients/snowflake/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool)
case typing.ETime.Kind:
switch kindDetails.ExtendedTimeDetails.Type {
case ext.TimestampTzKindType:
// We are not using `TIMESTAMP_NTZ` because Snowflake does not join on this data very well.
// It ends up trying to parse this data into a TIMESTAMP_TZ and messes with the join order.
// Specifically, if my location is in SF, it'll try to parse TIMESTAMP_NTZ into PST then into UTC.
// When it was already stored as UTC.
return "timestamp_tz"
case ext.TimestampNTZKindType:
return "timestamp_ntz"
case ext.DateKindType:
return "date"
case ext.TimeKindType:
Expand Down Expand Up @@ -99,8 +97,10 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing.
return typing.Struct, nil
case "array":
return typing.Array, nil
case "datetime", "timestamp", "timestamp_ltz", "timestamp_ntz", "timestamp_tz":
case "timestamp_ltz", "timestamp_tz":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), nil
case "timestamp", "datetime", "timestamp_ntz":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampNTZKindType), nil
case "time":
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), nil
case "date":
Expand Down
22 changes: 17 additions & 5 deletions clients/snowflake/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,23 @@ func TestSnowflakeDialect_KindForDataType(t *testing.T) {
}

func TestSnowflakeDialect_KindForDataType_DateTime(t *testing.T) {
expectedDateTimes := []string{"DATETIME", "TIMESTAMP", "TIMESTAMP_LTZ", "TIMESTAMP_NTZ(9)", "TIMESTAMP_TZ"}
for _, expectedDateTime := range expectedDateTimes {
kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "")
assert.NoError(t, err)
assert.Equal(t, ext.TimestampTz.Type, kd.ExtendedTimeDetails.Type, expectedDateTime)
{
// Timestamp with time zone
expectedDateTimes := []string{"TIMESTAMP_LTZ", "TIMESTAMP_TZ"}
for _, expectedDateTime := range expectedDateTimes {
kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "")
assert.NoError(t, err)
assert.Equal(t, ext.TimestampTz.Type, kd.ExtendedTimeDetails.Type, expectedDateTime)
}
}
{
// Timestamp without time zone
expectedDateTimes := []string{"TIMESTAMP", "DATETIME", "TIMESTAMP_NTZ(9)"}
for _, expectedDateTime := range expectedDateTimes {
kd, err := SnowflakeDialect{}.KindForDataType(expectedDateTime, "")
assert.NoError(t, err)
assert.Equal(t, ext.TimestampNTZ.Type, kd.ExtendedTimeDetails.Type, expectedDateTime)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
r.T(),
ext.NewExtendedTime(
time.Date(2023, time.February, 2, 17, 51, 35, 175445*1000, time.UTC),
ext.TimestampTzKindType, ext.RFC3339Microsecond,
ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ,
),
evtData["ts_no_tz1"],
)
Expand Down
36 changes: 24 additions & 12 deletions lib/debezium/converters/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,60 @@ import (

type Timestamp struct{}

func (Timestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)
func (Timestamp) layout() string {
return ext.RFC3339MillisecondNoTZ
}

func (Timestamp) Convert(value any) (any, error) {
func (t Timestamp) ToKindDetails() typing.KindDetails {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, t.layout())
}

func (t Timestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of milliseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMilli(castedValue).In(time.UTC), ext.TimestampTzKindType, ext.RFC3339Millisecond), nil
return ext.NewExtendedTime(time.UnixMilli(castedValue).In(time.UTC), ext.TimestampNTZKindType, t.layout()), nil
}

type MicroTimestamp struct{}

func (MicroTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)
func (MicroTimestamp) layout() string {
return ext.RFC3339MicrosecondNoTZ
}

func (mt MicroTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, mt.layout())
}

func (MicroTimestamp) Convert(value any) (any, error) {
func (mt MicroTimestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of microseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue).In(time.UTC), ext.TimestampTzKindType, ext.RFC3339Microsecond), nil
return ext.NewExtendedTime(time.UnixMicro(castedValue).In(time.UTC), ext.TimestampNTZKindType, mt.layout()), nil
}

type NanoTimestamp struct{}

func (NanoTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)
func (nt NanoTimestamp) ToKindDetails() typing.KindDetails {
return typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, nt.layout())
}

func (NanoTimestamp) layout() string {
return ext.RFC3339NanosecondNoTZ
}

func (NanoTimestamp) Convert(value any) (any, error) {
func (nt NanoTimestamp) Convert(value any) (any, error) {
castedValue, err := typing.AssertType[int64](value)
if err != nil {
return nil, err
}

// Represents the number of nanoseconds since the epoch, and does not include timezone information.
return ext.NewExtendedTime(time.UnixMicro(castedValue/1_000).In(time.UTC), ext.TimestampTzKindType, ext.RFC3339Nanosecond), nil
return ext.NewExtendedTime(time.UnixMicro(castedValue/1_000).In(time.UTC), ext.TimestampNTZKindType, nt.layout()), nil
}
18 changes: 9 additions & 9 deletions lib/debezium/converters/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), Timestamp{}.ToKindDetails())
assert.Equal(t, typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MillisecondNoTZ), Timestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := Timestamp{}.Convert("invalid")
Expand All @@ -19,18 +19,18 @@ func TestTimestamp_Converter(t *testing.T) {
// Valid conversion
converted, err := Timestamp{}.Convert(int64(1_725_058_799_089))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.089Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-08-30T22:59:59.089", converted.(*ext.ExtendedTime).String(""))
}
{
// ms is preserved despite it being all zeroes.
converted, err := Timestamp{}.Convert(int64(1_725_058_799_000))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.000Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-08-30T22:59:59.000", converted.(*ext.ExtendedTime).String(""))
}
}

func TestMicroTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), MicroTimestamp{}.ToKindDetails())
assert.Equal(t, typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), MicroTimestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := MicroTimestamp{}.Convert("invalid")
Expand All @@ -40,18 +40,18 @@ func TestMicroTimestamp_Converter(t *testing.T) {
// Valid conversion
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_827_923))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827923Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-04-08T20:56:35.827923", converted.(*ext.ExtendedTime).String(""))
}
{
// micros is preserved despite it being all zeroes.
converted, err := MicroTimestamp{}.Convert(int64(1_712_609_795_820_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.820000Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-04-08T20:56:35.820000", converted.(*ext.ExtendedTime).String(""))
}
}

func TestNanoTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), NanoTimestamp{}.ToKindDetails())
assert.Equal(t, typing.NewExtendedTimeDetails(typing.ETime, ext.TimestampNTZKindType, ext.RFC3339NanosecondNoTZ), NanoTimestamp{}.ToKindDetails())
{
// Invalid conversion
_, err := NanoTimestamp{}.Convert("invalid")
Expand All @@ -61,12 +61,12 @@ func TestNanoTimestamp_Converter(t *testing.T) {
// Valid conversion
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_001_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827001000Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-04-08T20:56:35.827001000", converted.(*ext.ExtendedTime).String(""))
}
{
// nanos is preserved despite it being all zeroes.
converted, err := NanoTimestamp{}.Convert(int64(1_712_609_795_827_000_000))
assert.NoError(t, err)
assert.Equal(t, "2024-04-08T20:56:35.827000000Z", converted.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-04-08T20:56:35.827000000", converted.(*ext.ExtendedTime).String(""))
}
}
14 changes: 11 additions & 3 deletions lib/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,22 @@ func TestField_ToKindDetails(t *testing.T) {
assert.Equal(t, typing.Invalid, kd)
}
{
// Timestamp
// Datetime (for now)
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp, ZonedTimestamp} {
// Timestamp with timezone
for _, dbzType := range []SupportedDebeziumType{ZonedTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), kd)
}
}
{
// Timestamp without timezone
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, ext.TimestampNTZKindType, kd.ExtendedTimeDetails.Type)
assert.Equal(t, typing.ETime.Kind, kd.Kind)
}
}
{
// Dates
for _, dbzType := range []SupportedDebeziumType{Date, DateKafkaConnect} {
Expand Down
8 changes: 4 additions & 4 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,15 @@ func TestField_ParseValue(t *testing.T) {
field := Field{Type: Int64, DebeziumType: dbzType}
value, err := field.ParseValue(int64(1_725_058_799_000))
assert.NoError(t, err)
assert.Equal(t, "2024-08-30T22:59:59.000Z", value.(*ext.ExtendedTime).String(""))
assert.Equal(t, "2024-08-30T22:59:59.000", value.(*ext.ExtendedTime).String(""))
}
}
{
// Nano timestamp
field := Field{Type: Int64, DebeziumType: NanoTimestamp}
val, err := field.ParseValue(int64(1_712_609_795_827_000_000))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), ext.TimestampTzKindType, "2006-01-02T15:04:05.000000000Z07:00"), val.(*ext.ExtendedTime))
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), ext.TimestampNTZKindType, "2006-01-02T15:04:05.000000000"), val.(*ext.ExtendedTime))
}
{
// Micro timestamp
Expand All @@ -319,13 +319,13 @@ func TestField_ParseValue(t *testing.T) {
// Int64
val, err := field.ParseValue(int64(1_712_609_795_827_009))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827009000, time.UTC), ext.TimestampTzKindType, ext.RFC3339Microsecond), val.(*ext.ExtendedTime))
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827009000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime))
}
{
// Float64
val, err := field.ParseValue(float64(1_712_609_795_827_001))
assert.NoError(t, err)
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), ext.TimestampTzKindType, ext.RFC3339Microsecond), val.(*ext.ExtendedTime))
assert.Equal(t, ext.NewExtendedTime(time.Date(2024, time.April, 8, 20, 56, 35, 827001000, time.UTC), ext.TimestampNTZKindType, ext.RFC3339MicrosecondNoTZ), val.(*ext.ExtendedTime))
}
{
// Invalid (string)
Expand Down
8 changes: 8 additions & 0 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,16 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro
inMemoryCol.KindDetails.ExtendedTimeDetails = &ext.NestedKind{}
}

// If the column in the destination is a timestamp_tz and the in-memory column is a timestamp_ntz, we should update the layout to contain timezone locale.
if foundColumn.KindDetails.ExtendedTimeDetails.Type == ext.TimestampTzKindType && inMemoryCol.KindDetails.ExtendedTimeDetails.Type == ext.TimestampNTZKindType {
if inMemoryCol.KindDetails.ExtendedTimeDetails.Format != "" {
inMemoryCol.KindDetails.ExtendedTimeDetails.Format += ext.TimezoneOffsetFormat
}
}

// Just copy over the type since the format wouldn't be present in the destination
inMemoryCol.KindDetails.ExtendedTimeDetails.Type = foundColumn.KindDetails.ExtendedTimeDetails.Type

}

// Copy over the decimal details
Expand Down
Loading

0 comments on commit c909d84

Please sign in to comment.