From cc6be5b7b860251d1dddd20d5b591161e7c6ef7c Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 12 Dec 2024 07:57:16 +0530 Subject: [PATCH 1/2] feat(cli): add --workers arg in delete command --- .../src/datahub/cli/delete_cli.py | 86 ++++++++++++++----- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index a640f941b75276..269db8e954c09f 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -1,4 +1,5 @@ import logging +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime from random import choices @@ -345,6 +346,9 @@ def undo_by_filter( default=False, help="Only delete soft-deleted entities, for hard deletion", ) +@click.option( + "--workers", type=int, default=1, help="Num of workers to use for deletion." +) @upgrade.check_upgrade @telemetry.with_telemetry() def by_filter( @@ -362,6 +366,7 @@ def by_filter( batch_size: int, dry_run: bool, only_soft_deleted: bool, + workers: int = 1, ) -> None: """Delete metadata from datahub using a single urn or a combination of filters.""" @@ -382,16 +387,19 @@ def by_filter( # TODO: add some validation on entity_type if not force and not soft and not dry_run: + message = ( + "Hard deletion will permanently delete data from DataHub and can be slow. " + "We generally recommend using soft deletes instead. " + "Do you want to continue?" + ) if only_soft_deleted: click.confirm( - "This will permanently delete data from DataHub. Do you want to continue?", + message, abort=True, ) else: click.confirm( - "Hard deletion will permanently delete data from DataHub and can be slow. " - "We generally recommend using soft deletes instead. " - "Do you want to continue?", + message, abort=True, ) @@ -462,26 +470,64 @@ def by_filter( abort=True, ) - urns_iter = urns - if not delete_by_urn and not dry_run: - urns_iter = progressbar.progressbar(urns, redirect_stdout=True) + _delete_urns_parallel( + graph=graph, + urns=urns, + aspect_name=aspect, + soft=soft, + dry_run=dry_run, + delete_by_urn=delete_by_urn, + start_time=start_time, + end_time=end_time, + workers=workers, + ) - # Run the deletion. + +def _delete_urns_parallel( + graph: DataHubGraph, + urns: List[str], + delete_by_urn: bool, + start_time: Optional[datetime], + end_time: Optional[datetime], + aspect_name: Optional[str] = None, + soft: bool = True, + dry_run: bool = False, + workers: int = 1, +) -> None: deletion_result = DeletionResult() - with PerfTimer() as timer: - for urn in urns_iter: - one_result = _delete_one_urn( - graph=graph, - urn=urn, - aspect_name=aspect, - soft=soft, - dry_run=dry_run, - start_time=start_time, - end_time=end_time, + + def process_urn(urn): + return _delete_one_urn( + graph=graph, + urn=urn, + aspect_name=aspect_name, + soft=soft, + dry_run=dry_run, + start_time=start_time, + end_time=end_time, + ) + + with PerfTimer() as timer, ThreadPoolExecutor(max_workers=workers) as executor: + future_to_urn = {executor.submit(process_urn, urn): urn for urn in urns} + + completed_futures = as_completed(future_to_urn) + if not delete_by_urn and not dry_run: + futures_iter = progressbar.progressbar( + as_completed(future_to_urn), + max_value=len(future_to_urn), + redirect_stdout=True, ) - deletion_result.merge(one_result) + else: + futures_iter = completed_futures + + for future in futures_iter: + try: + one_result = future.result() + deletion_result.merge(one_result) + except Exception as e: + urn = future_to_urn[future] + click.secho(f"Error processing URN {urn}: {e}", fg="red") - # Report out a summary of the deletion result. click.echo( deletion_result.format_message( dry_run=dry_run, soft=soft, time_sec=timer.elapsed_seconds() From b74b19f5fef6c098ef05987456feacab7409b3f0 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 12 Dec 2024 08:06:17 +0530 Subject: [PATCH 2/2] fix lint --- metadata-ingestion/src/datahub/cli/delete_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 269db8e954c09f..1a75459a92c5cf 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -389,8 +389,8 @@ def by_filter( if not force and not soft and not dry_run: message = ( "Hard deletion will permanently delete data from DataHub and can be slow. " - "We generally recommend using soft deletes instead. " - "Do you want to continue?" + "We generally recommend using soft deletes instead. " + "Do you want to continue?" ) if only_soft_deleted: click.confirm(