Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Critical Warnings For Users of Iceberg Kafka Connect Need To Be Documented #12357

Open
3 tasks
fuzing opened this issue Feb 20, 2025 · 1 comment
Open
3 tasks
Labels
improvement PR that improves existing functionality

Comments

@fuzing
Copy link

fuzing commented Feb 20, 2025

Feature Request / Improvement

This connector should be documented more clearly for dangerous caveats, namely:

It stores its Kafka topic offsets as ancillary data within the Iceberg snapshot data (snapshot summary). Committing new data to your tables requires the kafka offset to be >= the offsets stored within the connector-created snapshot summary data stored in iceberg.

This can be seen in the Coordinator#commitToTable() method
`

List<Envelope> filteredEnvelopeList =
    envelopeList.stream()
        .filter(
            envelope -> {
              Long minOffset = committedOffsets.get(envelope.partition());
              return minOffset == null || envelope.offset() >= minOffset;
            })
        .collect(toList());

`

This is very problematic in the following scenarios:

  1. If one does not persist Kafka state across invocations (e.g. docker/k8s without specifying volumes for kafka data) then upon restart this connector will not begin committing data until after the offset is >= to that stored in the iceberg snapshot summary. So, if you commit up to, say, offset 10,000 for a given table, then restart kafka, there won't be a new iceberg commit made for that table until after you get past 10,000. Worse still, this is only filtered on commit (i.e. after your actual data has been written to iceberg), meaning you'll end up with 10,000+ orphaned files in iceberg.
  2. When using this connector, it should probably be the only ingestion source to iceberg. If you have multiple ingestion sources to iceberg, these sources won't be using the same semantics as this connector, and clearly won't be writing this extra snapshot summary data. This is less of an issue because the connector sees the absence of this special snapshot summary (i.e. minOffset == null) as an affirmative signal for commit, but this is definitely a design-smell

It looks like a fair number of the currently outstanding issues for this repo center on these design anomalies. It would definitely be wise to document this.

One solution for those environments not persisting kafka state would be to have a boolean setting that could be set such that the connector disregards the iceberg persisted offsets for its initial commit (this would solve for (1) above, but not (2)).

Otherwise, this connector is a great solution!

Please see here for the open issue over at Databricks

Query engine

None

Willingness to contribute

  • I can contribute this improvement/feature independently
  • I would be willing to contribute this improvement/feature with guidance from the Iceberg community
  • I cannot contribute this improvement/feature at this time
@rossmaddenwork
Copy link

Relevant comment from another repo: databricks/iceberg-kafka-connect#308 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement PR that improves existing functionality
Projects
None yet
Development

No branches or pull requests

2 participants