-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support date/time compute fields #25
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach LGTM
how does this patch play with the Cassandra Sink, the ElasticSearch Sink (or the other Sinks) ?
It works with with ES sink but not with Cassandra Sink (because the logical types that are supported are the CDC custom ones, not standard Avro). I created an issue to support that. As far as this patch, it makes sense to stick only to the standard avro types. |
930a20d
to
d3d14ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
great stuff
...ons/src/main/java/com/datastax/oss/pulsar/functions/transforms/jstl/CustomTypeConverter.java
Outdated
Show resolved
Hide resolved
LocalTime localTime = (LocalTime) value; | ||
return (int) (localTime.toNanoOfDay() / 1000000); | ||
case "timestamp-millis": | ||
validateLogicalType(value, schema.getLogicalType(), LocalDateTime.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be mapped to a j.u.Instant ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is driven by the type generated from the JSTL evaluator. Now the same question applies (why not use Instant with ejstl?)
We could, but I found it:
- More consistent with the
TIME
andDATE
types (because for those the better option isLocalTime
&LocalDate
as those parse the provided input in ISO format parts out of the box - things like2017-02-10
or22:30:00
- Instant iso format require the user to pass empty Z in the format (like
2017-02-10T22:30:00Z
, but local date time accepts2017-02-10T22:30:00
. As far as user is concerned, all dates in their avro output will be in reference to epoch - LocalDateTime also provides
plus
operation with Units of years and months which are utilized with thedateadd
method. Instant only accepts those values: https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html#plus-long-java.time.temporal.TemporalUnit-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that if the datetime is referenced to epoch, Instant is the type to use whereas LocalDatetime is not bound to a timezone (it's the time you see on the clock on the wall without knowing where you are). But probably you transparently assume UTC timezone and the user doesn't see the LocalDatetime type anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is fair, let me switch to time instant. And then we can decide separately if we want the user format to be:
'2007-12-03T10:15:30
vs 2007-12-03T10:15:30.00Z
(the latter is the Instant requirement, empty Z must be included indicating UTC.
Either way, the user is not aware of the underlying date object - it is just easier for us to use the underlying parse method as is without pre-processing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see. What I would suggest is that we support RFC3339 dates. This maps well to j.u.t.OffsetDateTime.
With maybe some tolerance on the input string where if the time offset is not there, we assume UTC. I think Jackson does it (not sure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also OffsetDateTime has plusMonths
and plusYears
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Jackson does it (not sure).
It does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to ISO 3330. to make documentation easier, we can stick to its format and refer user to the RFC. (for UTC, they could write either 2022-07-02T01:02:00Z OR 2022-07-02T01:02:00+00). If the provide millis (which I think will be common case) we assume epoch millis. Another common use case that won't include the string is fn:now()).
we can always add more permissive format (like 2022-07-02T01:02:00) without breaking existing users...
long actualNow = | ||
new JstlEvaluator<Long>("${fn:now()}", long.class).evaluate(primitiveStringContext); | ||
|
||
assertTrue(actualNow >= expectedNow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could have the possibility to pass a custom Clock to the class under test.
This way it's easier to control time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JUEL only supports binding of static method, so I have to do it the suboptimal way by introducing a clock on the static class and modify it by tests (as opposed to dependency injection)
|
||
long expected = Instant.now().plusSeconds(-3333).toEpochMilli(); | ||
long actual = | ||
new JstlEvaluator<Long>("${fn:dateadd(fn:now(), -3333, 'seconds')}", long.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to be able to do pass an ISO8601 duration: https://en.m.wikipedia.org/wiki/ISO_8601#Durations.
Jackson has good support for deserializing Durations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jackson may not even be needed. It's probably as simple as Duration.parse() : https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, will add in a separate PR: #28
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw the Duration in java.time supports only this format (still ISO compliant): PnDTnHnMn.nS
so user cannot include years or months. Joda period maybe a solution. Check #28 for more info.
a1705cf
to
b9a694d
Compare
} | ||
|
||
private static long dateadd(LocalDateTime localDateTime, long delta, String unit) { | ||
ChronoUnit chronoUnit = dateAddUnits.get(unit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use ChronoUnit.valueOf() and remove the dateAddUnits map ?
public static long dateadd(Object input, long delta, String unit) {
if (input instanceof String) {
return dateadd((String) input, delta, unit);
} else if (input instanceof Long) {
return dateadd((long) input, delta, unit);
}
throw new IllegalArgumentException(
"Invalid input type: "
+ input.getClass().getSimpleName()
+ ". Should either be an RFC3339 datetime string like '2007-12-01T12:30:00Z' or epoch millis");
}
private static long dateadd(String isoDateTime, long delta, String unit) {
OffsetDateTime offsetDateTime = OffsetDateTime.parse(isoDateTime);
return dateadd(offsetDateTime, delta, unit);
}
private static long dateadd(long epochMillis, long delta, String unit) {
Instant instant = Instant.ofEpochMilli(epochMillis);
OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
return dateadd(offsetDateTime, delta, unit);
}
private static long dateadd(OffsetDateTime offsetDateTime, long delta, String unit) {
try {
ChronoUnit chronoUnit = ChronoUnit.valueOf(unit.toUpperCase());
return offsetDateTime.plus(delta, chronoUnit).toInstant().toEpochMilli();
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid unit: "
+ unit
+ ". Should be one of "
+ Arrays.stream(ChronoUnit.values())
.map(s -> s.name().toLowerCase())
.collect(Collectors.toList()),
e);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unit does not map one to one with the Chrono units (it is a subset):
[nanos, micros, millis, seconds, minutes, hours, half_days, days, weeks, months, years, decades, centuries, millennia, eras, forever]
vs years, months, days,hours,minutes,millis
- also just preder to have them decoupled in case we use different names (day vs days)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. It's better to keep it decoupled.
Maybe use a switch instead of a Map ? (no strong opinion here)
If we keep the Map, we should initialize it with Map.of() (min JDK is 11) instead of a static initializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Switch fits better, I wanted to cache them for some reason but there is no need to.
assertNull(converter.convert(null, String.class)); | ||
assertNull(converter.convert(null, int.class)); | ||
assertNull(converter.convert(null, long.class)); | ||
assertNull(converter.convert(null, float.class)); | ||
assertNull(converter.convert(null, double.class)); | ||
assertNull(converter.convert(null, boolean.class)); | ||
assertNull(converter.convert(null, LocalDate.class)); | ||
assertNull(converter.convert(null, LocalTime.class)); | ||
assertNull(converter.convert(null, LocalDateTime.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still LocalDateTime ?
return clock.millis(); | ||
} | ||
|
||
private static final ZoneId UTC = ZoneId.of("UTC"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already exists as ZoneOffset.UTC
.getTypes() | ||
.stream() | ||
.filter(subSchema -> subSchema.getLogicalType() != null) | ||
.map(Schema::getLogicalType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filter and map can be switched. And filter is then just Objects::nonnull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Fixes: #20
This patch adds support for the following date/time types:
Date
: Contains date info only, represented as ISO 8601 date like '2022-10-02'Time
: Contains time info only, represented as ISO 8601 time like '23:00:00'OffsetDateTime
: Full date time info, represented as UTC ISO 3339 like '2011-12-03T10:15:30Z' or '2011-12-03T10:15:30+00:00' or '2011-12-03T10:15:30+02:00'[years, months, days, hours, minutes, seconds, millis]
Implementation note:
Date
,Time
,OffsetDateTime
tojava.time.LocalDate
,java.time.LocalTime
,java.time.Instant
respectively.Date
,Time
,OffsetDateTime
are converted to logical timesdate
,time-millis
, timestamp-millis respectively