-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Assign base row ID and default row commit version to AddFile #3894
[Kernel] Assign base row ID and default row commit version to AddFile #3894
Conversation
54b77cc
to
75c0005
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few things missing from the row tracking spec, see my comment
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
return new KernelException( | ||
"Cannot assign baseRowId to add action. " | ||
+ "The number of records in this data file is missing."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, this can be an issue: if connectors don't populate numRecords
stats in the addFile action that are committed, the commit will fail if row tracking is supported (note that this is still better than today where we always fail in that case since we don't support row tracking.
Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords
? Are connectors that implement writes today (if any) populating numRecords
?
In any case, I would word the exception so that it puts the burden more on the connector, for example:
"All add actions must have statistics that include the number of records when writing to a Delta table with the RowTracking table feature enabled."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords? Are connectors that implement writes today (if any) populating numRecords?
cc @vkorukanti thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect the connector to populate the numRecords
and other stats as it is a heavy operation and we don't want to do that in Kernel. If some protocol feature (e.g., icebergCompatV2) requires stats and they are missing in the DataFileStatus
, we throw errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on making the error message more explicit about (1) what is unsupported (cannot write to row tracking table)(2) why it is unsupported (requires numRecords to be populated in stats) (3) who is responsible/what needs to be updated for support (not supported by this kernel integration/engine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I’ve updated the error message to better explain the situation and our expectations.
So it seems like we should update the Row Tracking protocol to indicate that the AddFile statistics must include numRecords ?
I'm not sure if this should go into protocol. The numRecords
is used for assigning baseRowId
, but we might have alternative ways to obtain this information (e.g. passing it directly from the connector, or potentially computing it within the Kernel ourselves). It seems more like an expectation for connectors to provide this data.
75c0005
to
c584707
Compare
c584707
to
e94da89
Compare
* @return an {@link CloseableIterable} of data actions with base row IDs and default row commit | ||
* versions assigned | ||
*/ | ||
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible approach is to assign baseRowId
and defaultRowCommitVersion
when creating AddFile
actions.
However, this would duplicate logic across all places where AddFile
actions are created (currently just Transaction.generateAppendActions
, but potentially more in the future I guess). Plus, baseRowId
assignment is stateful, making it tricky if AddFile
actions are created in multiple steps/places.
I chose to handle it here for now to keep things centralized, even though it requires converting AddFile
rows back to actions, updating them, and converting back to rows.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/RowTrackingSuite.scala
Outdated
Show resolved
Hide resolved
954641b
to
2816342
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for making this. Left some comments.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
700a46f
to
895e0ae
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
* @param protocol the protocol to check | ||
* @return true if the protocol supports row tracking, false otherwise | ||
*/ | ||
public static boolean isRowTrackingSupported(Protocol protocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: where (probably in future PRs?) will you check that RowTracking is enabled
(which is strictly stronger than supported
?
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not be checking whether row tracking is enabled
in this PR or in planned future ones. The current goal is to add a minimal implementation to ensure row tracking is supported
in Delta Kernel. Addressing the enabled requirement is outside the current scope.
I imagine that in the future, we may need to check it is enabled when 1) reconstructing stable row ID / row commit version during reads, and 2) preserving stable row ID / row commit version during writes (e.g., for handling UPDATE and DELETE operations).
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
0fd9d1b
to
8a4cfd4
Compare
3e106d8
to
5def107
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Just asked for a few minor changes. Thanks!
Super clean btw; glad we did that refactor in another PR!
kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
* versions assigned | ||
*/ | ||
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion( | ||
SnapshotImpl snapshot, long commitVersion, CloseableIterable<Row> dataActions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like snapshot
is only being used for getProtocol()
now, right?
Can we just pass in the protocol? So we aren't coding to a specific implementation of snapshot (SnapshotImpl)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
snapshot
is actually also used to read the row ID high watermark from the domain metadata.
final AtomicLong currRowIdHighWatermark = new AtomicLong(readRowIdHighWaterMark(snapshot));
And this uses the SnapshotImpl
API getDomainMetadataMap
.
* plus 1. The high watermark is then incremented by the number of records in the file. For | ||
* actions missing a default row commit version, assigns the specified commit version. | ||
* | ||
* @param snapshot the current snapshot of the table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a static function, right? So what does "current" mean? Can you give some temporal context of when this method should be called duration a transaction.
Further, it is only called during transactions (writes). Would it ever be called during reads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "current snapshot" I mean the snapshot that this transaction is reading at, i.e. the TransactionImpl.readSnapshot
.
Further, it is only called during transactions (writes). Would it ever be called during reads?
Yes, it is only called during transactions (writes), and won't be called for reads.
I've updated the doc to make it clearer and provided more context. Thanks!
kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, we need to ensure writing to a table with row tracking supported works on tables that are on writer version 3 (includes invariants
) but don't contain any actual invariant
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
test("Integration test - Write table with Spark then write with Kernel") { | ||
// TODO: Implement this test. Creating and writing a table using Spark with row tracking also |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at https://github.com/delta-io/delta/blame/d86ae203a596bedb46b5751f15ca3847613fe0a2/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java#L262, this should only fail if the table actually contains an invariant, so in most cases it would work, e.p. we should be able to write a test here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address this in a follow up PR. Thanks!
3b616b8
to
dfbf700
Compare
…delta-io#3894) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR implements the row tracking _support_ requirements in Delta Kernel, according to the [Delta Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking). Specifically, it includes: - assigning `baseRowId` and `defaultRowCommitVersion` to `AddFile` actions prior to committing them - maintaining the `rowIdHighWaterMark` of the `delta.rowTracking` metadata domain during the base row ID assignment, which is the highest assigned fresh row id for the table ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added tests in `RowTrackingSuite.scala`. This includes unit tests and integration tests with Delta-Spark. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
Which Delta project/connector is this regarding?
Description
This PR implements the row tracking support requirements in Delta Kernel, according to the Delta Protocol. Specifically, it includes:
baseRowId
anddefaultRowCommitVersion
toAddFile
actions prior to committing themrowIdHighWaterMark
of thedelta.rowTracking
metadata domain during the base row ID assignment, which is the highest assigned fresh row id for the tableHow was this patch tested?
Added tests in
RowTrackingSuite.scala
. This includes unit tests and integration tests with Delta-Spark.Does this PR introduce any user-facing changes?
No.