Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuzzy Duplicates Identification fails on batched_merge_and_write when document dataset is read with blocksize #462

Open
praateekmahajan opened this issue Jan 2, 2025 · 1 comment
Labels
bug Something isn't working

Comments

@praateekmahajan
Copy link
Collaborator

praateekmahajan commented Jan 2, 2025

When reading dataset with DocumentDataset.read_parquet(..., blocksize=???, files_per_partition=None) and running fuzzy dedup, protocol=ucx false positive=on we run into an error during the shuffle_docs_on_buckets -> _batched_merge_and_write step

Stage3 (False Postive Check): Shuffle docs

  0%|          | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 4 text partitions.
2025-01-02 08:31:21,288 - distributed.worker - ERROR - Compute Failed
Key:       ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1)
State:     executing
Task:  <Task ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1) _execute_subgraph(...)>
Exception: "ValueError('Cannot align indices with non-unique values')"
Traceback: '  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_expr.py", line 1849, in assign\n    df[name] = val\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1445, in __setitem__\n    self.insert(self._num_columns, arg, value)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3329, in insert\n    return self._insert(\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3403, in _insert\n    value = value._align_to_index(\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/indexed_frame.py", line 3739, in _align_to_index\n    raise ValueError("Cannot align indices with non-unique values")\n'

2025-01-02 08:31:21,351 - distributed.worker - ERROR - Compute Failed
Key:       getitem-de6fe9f32b5dc94114977026f9696781
State:     executing
Task:  <Task 'getitem-de6fe9f32b5dc94114977026f9696781' getitem(...)>
Exception: 'KeyError(0)'
Traceback: ''


  0%|          | 0/1 [00:01<?, ?it/s]
Traceback (most recent call last):
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 1127, in main
    run_curation_pipeline(
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 969, in run_curation_pipeline
    curation_steps(dataset)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/meta.py", line 22, in __call__
    dataset = module(dataset)
  File "/benchmark/nemo-curator/scripts/pipeline_utils.py", line 115, in wrapped
    return func(
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 404, in fuzzy_dedupe
    duplicates = fuzzy_dup(dataset=dataset)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 673, in __call__
    self.jaccard_shuffle.shuffle_docs_on_buckets(
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1166, in shuffle_docs_on_buckets
    self._batched_merge_and_write(
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1309, in _batched_merge_and_write
    written_files = written_files.compute()
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_collection.py", line 480, in compute
    return DaskMethodsMixin.compute(out, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2427, in _gather
    raise exception.with_traceback(traceback)
KeyError: 0

Environment

crossfit                           0.0.8
cudf-cu12                          25.2.0a219
cugraph-cu12                       25.2.0a55
cuml-cu12                          25.2.0a38
dask                               2024.12.1
dask-cuda                          25.2.0a14
dask-cudf-cu12                     25.2.0a219
dask-expr                          1.1.21
dask_labextension                  7.0.0
dask-mpi                           2022.4.0
distributed                        2024.12.1
distributed-ucxx-cu12              0.42.0a22
libcudf-cu12                       25.2.0a219
libucx-cu12                        1.17.0.post1
libucxx-cu12                       0.42.0a22
nemo_curator                       0.6.0rc0.dev1
pylibcudf-cu12                     25.2.0a219
pylibcugraph-cu12                  25.2.0a55
raft-dask-cu12                     25.2.0a30
rapids-dask-dependency             25.2.0a9
torch                              2.5.1
ucx-py-cu12                        0.42.0a8
ucxx-cu12                          0.42.0a22
curator d401333ec9d88c36494befc9ae7515574c4d89fb
@praateekmahajan praateekmahajan added the bug Something isn't working label Jan 2, 2025
@praateekmahajan
Copy link
Collaborator Author

From @VibhuJawa

Read through the code, Yup blocksize=xyz error does makes sense because we are relying on total_text_partitions (see line-fuzzy_dedup.py#L1193C9-L1193C30) .
If this is no longer true (which is the case with blocksize run time fusion) .
We can probably fix it by adding a repartition before this line(https://github.com/NVIDIA/NeMo-Curator/blob/db411b0480b0b84d481505a92553be12332191cf/nemo_curator/modules/fuzzy_dedup.py#L1193C9-L1193C30)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant