Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

previous eq deletes handling on new write #12280

Open
1 of 3 tasks
eshishki opened this issue Feb 15, 2025 · 8 comments
Open
1 of 3 tasks

previous eq deletes handling on new write #12280

eshishki opened this issue Feb 15, 2025 · 8 comments
Labels
improvement PR that improves existing functionality

Comments

@eshishki
Copy link

Feature Request / Improvement

We do ingestion from debezium to iceberg via https://github.com/databricks/iceberg-kafka-connect/
Basically it uses flink delta writer.

Each batch of data writes small number of eq deletes for updates of prev commit data.
Most of db pk keys are uuid and so we even a handful of eq delete rows cover a large portion of data files (via lower/upper bounds check),
forcing costly check at query time.

We do run periodic compaction process, but it is inefficient, since it forces us to rewrite practically whole table, which would be "dirty" within 5 minutes of commit interval.

We thought about having multiple eq delete files, to make bounds more granular and to emulate poor man bloom filter.
But it again add many ranges and only postpone the issue, the table would be dirty say in 30 minutes, not 5.

If however new writer could read previous handful of eq deletes, maybe it could have combined them with new ones, so that the number of range buckets would stay ~ constant.

Query engine

None

Willingness to contribute

  • I can contribute this improvement/feature independently
  • I would be willing to contribute this improvement/feature with guidance from the Iceberg community
  • I cannot contribute this improvement/feature at this time
@eshishki eshishki added the improvement PR that improves existing functionality label Feb 15, 2025
@eshishki
Copy link
Author

maybe we could have rewrite_position_delete_files but for eq

@singhpk234
Copy link
Contributor

singhpk234 commented Feb 16, 2025

sounds fair, if eq deletes are partition scoped, may be we need to stack it either per write or as part of async process like rewrite_position_delete_files. Side note : Can you please also share which reader are your referring to specifically, as evey reader handles it way differently for ex : Impala write this as a JOIN where as Spark has completely diff approach.

Let me think more about this approach

@eshishki
Copy link
Author

currently we use starrocks which plans the scan like

UNION
├── ICEBERG_SCAN (data files with only pos deletes)
│   └── OutputRows: 73
│
 ── HASH_JOIN (LEFT ANTI JOIN)
        ├── <PROBE> ICEBERG_SCAN (data files covered by eq deletes)
        │   └── OutputRows: ~124.5M rows
        │
        └── <BUILD> ICEBERG_SCAN rows from eq delete files)
            └── OutputRows: 246
            └── Join Condition: id = id

so we have for 246 eq deletes rows, which bound stats cover the whole data files

i don't know if it could be improved with bloom filter in canContainEqDeletesForFile

@singhpk234
Copy link
Contributor

I see if it written this way i.e join, each eq delete would be scanned only once right (same is what Impala does). Is there a configuration to read multiple eq deletes in a single execution task (essentially pack ?) as there will be always an issue with parallelism if we try to re:write eq deletes ?

Consider ICEBERG_SCAN rows from eq delete files) could support parallely scanning 10 files, but we compacted / re:written 10 files into 1 so now the read parallelism got reduced to 1 ? may be better to bin-pack at engine level at scanning ?

The problem is even worse in Spark as eq delete can get scanned multiple times for a single file. so we need some strategies around how to distribute these tasks

@eshishki
Copy link
Author

in our scenario each commit adds 1 eq delete file, every 5 minutes, 12 times an hour
we run compaction say every hour, and the number of eq delete files stays within reason

i think we can tradeoff a large number of delete files to more granular bounds, so that we reduce the number of data files and rows we need to recheck for deletes

so the theoretical eq_delete_rewrite procedure should

  1. try to keep the number of eq delete files constant
  2. rewrite so that to minimize the number of data file rows overlapped with eq delete file bounds

this would help spark too, since it will reduce the number of file references

now our situation is:
Record Counts:
Total Data Records: 124,557,336
Total Data Files: 217
Records with no deletes: 0
Records with only eq deletes: 0
Records with only pos deletes: 1,164
Records with both deletes: 124,556,172

Delete Statistics:
Records with eq deletes total: 124,556,172
Unique eq delete files: 11
Eq delete files referenced: 2,204
Eq delete records: 8,714

Pos Delete Statistics:
Unique pos delete files: 10
Pos delete files referenced: 2,059
Pos delete records: 170

frankly i would love to see any improvement that reduce "Records with eq deletes total"

@pvary
Copy link
Contributor

pvary commented Feb 17, 2025

Be careful about rewriting equality deletes to new equality deletes. The equality delete will remove every occurrence of the previous row in previous commits.
For example:

  • Commit 1 adds row with PK1, PK2 - Creates a data file with PK1 and PK2
  • Commit 2 deletes PK1 - Creates an equality delete for PK1
  • Commit 3 inserts PK1 - Creates a data file for PK1
  • Commit 4 updates PK2 - Creates an equality delete for PK2, and a data file for PK2
  • Commit 5 updates PK2 - Creates an equality delete for PK2, and a data file for PK2
  • Commit 6 does the equality delete compaction

If we compact the equality deletes then we need to decide when these deletes should be applied. If we apply them at Commit 6 we lose PK1. If we apply them at Commit 2 then we will have duplicated PK2

Converting equality deletes to positional deletes with file granularity (spark like), or DVs (Impala like) could help to reduce the number of files to read for different readers.

@ismailsimsek
Copy link
Contributor

ismailsimsek commented Feb 17, 2025

eq deletes are partition scoped, AFAIK this is correct. Would it be more performant partitioning the table based on Primary key (id) ? In that case equality delete compaction will happen inside the effected partition, with this then compaction should apply on smaller data set instead of full table?

@eshishki
Copy link
Author

eshishki commented Feb 17, 2025

i was thinking about bloom filters some more,
it might be costly to check during planning but certainly alright for a rewrite procedure

we can quickly determine what files need to be really examined, then emit position deletes for them
no need to reshuffle eq delete files, just get rid of them altogether

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement PR that improves existing functionality
Projects
None yet
Development

No branches or pull requests

4 participants