-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add in-commit timestamp support for change data feed #617
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #617 +/- ##
==========================================
+ Coverage 83.43% 83.47% +0.03%
==========================================
Files 75 75
Lines 16922 16978 +56
Branches 16922 16978 +56
==========================================
+ Hits 14119 14172 +53
- Misses 2146 2148 +2
- Partials 657 658 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few things looks good tho!
for actions in action_iter { | ||
let actions = actions?; | ||
|
||
let mut visitor = PreparePhaseVisitor { | ||
add_paths: &mut add_paths, | ||
remove_dvs: &mut remove_dvs, | ||
has_cdc_action: &mut has_cdc_action, | ||
commit_timestamp: &mut timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this be clearer?
commit_timestamp: &mut timestamp, | |
in_commit_timestamp: &mut timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We initialize this field with the file modification timestamp, so it would be inaccurate to call it that. I do like the update you made below tho when we actually read ICT from a commitinfo.
@@ -136,15 +137,14 @@ impl LogReplayScanner { | |||
/// 2. Construct a map from path to deletion vector of remove actions that share the same path | |||
/// as an add action. | |||
/// 3. Perform validation on each protocol and metadata action in the commit. | |||
/// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't comment on L130 above but I think we need to do some comment updates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I went through every mention of ICT and I think I got them all.
Action::CommitInfo(CommitInfo { | ||
in_commit_timestamp: Some(timestamp), | ||
..Default::default() | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if commit info isn't first? do we still read it? I know the protocol says it must be first with ICT enabled but I wonder what the expected behavior is when it isn't first? do we do the right thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(but probably don't solve here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed a little here:
#581 (comment)
I'm still quite certain that delta-spark doesn't care about the ordering because it goes through the all actions in the commit looking for CommitInfo
var commitInfo: Option[CommitInfo] = None
actions.foreach {
case c: AddCDCFile =>
cdcActions.append(c)
totalFiles += 1L
totalBytes += c.size
case a: AddFile =>
totalFiles += 1L
totalBytes += a.size
case r: RemoveFile =>
totalFiles += 1L
totalBytes += r.size.getOrElse(0L)
case i: CommitInfo => commitInfo = Some(i)
case _ => // do nothing
}
I've added a check that only puts in the ICT if it is the first action in the log, but there comes a question: should we fail if it isn't the first action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can also revert the check that CommitInfo is first and revisit that in a future PR.
Arc::new(StructType::new(vec![ | ||
Option::<Add>::get_struct_field(ADD_NAME), | ||
Option::<Remove>::get_struct_field(REMOVE_NAME), | ||
Option::<Cdc>::get_struct_field(CDC_NAME), | ||
Option::<Metadata>::get_struct_field(METADATA_NAME), | ||
Option::<Protocol>::get_struct_field(PROTOCOL_NAME), | ||
StructField::new("commitInfo", StructType::new([ict_type]), true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though i wonder if we can do something similar to above like Option<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)
and get struct field inCommitTimestamp
of that?
but for now at least can use COMMIT_INFO_NAME
?
StructField::new("commitInfo", StructType::new([ict_type]), true), | |
StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we can do something similar to above like Option::get_struct_field(COMMIT_INFO_NAME) and get struct field inCommitTimestamp of that?
We would get a StructField
of type CommitInfo, which we'd have to 1) get datatype, 2) cast to a struct 3) get the ICT field. So I'll stick with your suggested change 👍
Action::Cdc(cdc.clone()), | ||
Action::CommitInfo(commit_info.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these ordered? should commit info be first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
swapped ordering
da9fe5e
to
7242904
Compare
7242904
to
f50c202
Compare
What changes are proposed in this pull request?
This adds support for in-commit timestamps when performing change data feed. Now when a commit contains commitInfo with
inCommitTimestamp
, that timestamp will be the one used for all changed rows in the commit.Depends on #581
Please only review these commits.
How was this change tested?
Add tests to check that the timestamp extracted from commits containing in-commit-timestamps are the ICT instead of file modification time.