Skip to content

Commit

Permalink
Back ISO datetime strings by java.time.Instant
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Oct 6, 2022
1 parent fe181a2 commit b9a694d
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

import com.datastax.oss.pulsar.functions.transforms.model.ComputeField;
import com.datastax.oss.pulsar.functions.transforms.model.ComputeFieldType;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -203,9 +202,9 @@ private Object getAvroValue(Schema schema, Object value) {
LocalTime localTime = (LocalTime) value;
return (int) (localTime.toNanoOfDay() / 1000000);
case "timestamp-millis":
validateLogicalType(value, schema.getLogicalType(), LocalDateTime.class);
LocalDateTime localDateTime = (LocalDateTime) value;
return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
validateLogicalType(value, schema.getLogicalType(), Instant.class);
Instant instant = (Instant) value;
return instant.toEpochMilli();
}

throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import de.odysseus.el.misc.TypeConverter;
import de.odysseus.el.misc.TypeConverterImpl;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import javax.el.ELException;

Expand All @@ -37,8 +37,8 @@ public <T> T convert(Object o, Class<T> aClass) throws ELException {
return (T) LocalDate.parse(o.toString());
} else if (aClass == LocalTime.class) {
return (T) LocalTime.parse(o.toString());
} else if (aClass == LocalDateTime.class) {
return (T) LocalDateTime.parse(o.toString());
} else if (aClass == Instant.class) {
return (T) Instant.parse(o.toString());
}
return typeConverter.convert(o, aClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public static long dateadd(Object input, long delta, String unit) {
}

private static long dateadd(String isoDateTime, long delta, String unit) {
LocalDateTime localDateTime = LocalDateTime.parse(isoDateTime);
return dateadd(localDateTime, delta, unit);
Instant instant = Instant.parse(isoDateTime);
return dateadd(LocalDateTime.ofInstant(instant, UTC), delta, unit);
}

private static long dateadd(long epochMillis, long delta, String unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.datastax.oss.pulsar.functions.transforms.model;

import com.datastax.oss.pulsar.functions.transforms.jstl.JstlEvaluator;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Set;
import javax.el.ELException;
Expand Down Expand Up @@ -125,7 +125,7 @@ private Class<?> getJavaType() {
case TIME:
return LocalTime.class;
case DATETIME:
return LocalDateTime.class;
return Instant.class;
default:
throw new UnsupportedOperationException("Unsupported compute field type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private List<ComputeField> buildComputeFields(String scope, boolean optional, bo
fields.add(
ComputeField.builder()
.scopedName(scope + "." + "newDateTimeField")
.expression(nullify ? "null" : "'2007-12-03T10:15:30'")
.expression(nullify ? "null" : "'2007-12-03T10:15:30Z'")
.optional(optional)
.type(ComputeFieldType.DATETIME)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,37 +150,37 @@ public static Object[][] functionExpressionProvider() {
{"fn:concat(value, null)", primitiveStringContext, "test-message"},
{"fn:concat(null, '-suffix')", primitiveStringContext, "-suffix"},
{
"fn:dateadd('2017-01-02T00:01:02', 1, 'years')",
"fn:dateadd('2017-01-02T00:01:02Z', 1, 'years')",
primitiveStringContext,
Instant.parse("2018-01-02T00:01:02Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', -1, 'months')",
"fn:dateadd('2017-01-02T00:01:02Z', -1, 'months')",
primitiveStringContext,
Instant.parse("2016-12-02T00:01:02Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', 1, 'days')",
"fn:dateadd('2017-01-02T00:01:02Z', 1, 'days')",
primitiveStringContext,
Instant.parse("2017-01-03T00:01:02Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', -1, 'hours')",
"fn:dateadd('2017-01-02T00:01:02Z', -1, 'hours')",
primitiveStringContext,
Instant.parse("2017-01-01T23:01:02Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', 1, 'minutes')",
"fn:dateadd('2017-01-02T00:01:02Z', 1, 'minutes')",
primitiveStringContext,
Instant.parse("2017-01-02T00:02:02Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', -1, 'seconds')",
"fn:dateadd('2017-01-02T00:01:02Z', -1, 'seconds')",
primitiveStringContext,
Instant.parse("2017-01-02T00:01:01Z").toEpochMilli()
},
{
"fn:dateadd('2017-01-02T00:01:02', 1, 'millis')",
"fn:dateadd('2017-01-02T00:01:02Z', 1, 'millis')",
primitiveStringContext,
Instant.parse("2017-01-02T00:01:02.001Z").toEpochMilli()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static Object[][] millisDateAddProvider() {
*/
@DataProvider(name = "isoDateAddProvider")
public static Object[][] isoDateAddProvider() {
String isoDateTime = "2022-10-02T01:02:03";
String isoDateTime = "2022-10-02T01:02:03Z";
Instant instant = Instant.parse("2022-10-02T01:02:03Z");
return new Object[][] {
{isoDateTime, 0, "years", instant.toEpochMilli()},
Expand Down

0 comments on commit b9a694d

Please sign in to comment.