Skip to content

Commit

Permalink
feat(cli): add --workers arg in delete command (datahub-project#12102)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored and sleeperdeep committed Dec 17, 2024
1 parent 11f516e commit 6db28b2
Showing 1 changed file with 66 additions and 20 deletions.
86 changes: 66 additions & 20 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from random import choices
Expand Down Expand Up @@ -345,6 +346,9 @@ def undo_by_filter(
default=False,
help="Only delete soft-deleted entities, for hard deletion",
)
@click.option(
"--workers", type=int, default=1, help="Num of workers to use for deletion."
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def by_filter(
Expand All @@ -362,6 +366,7 @@ def by_filter(
batch_size: int,
dry_run: bool,
only_soft_deleted: bool,
workers: int = 1,
) -> None:
"""Delete metadata from datahub using a single urn or a combination of filters."""

Expand All @@ -382,16 +387,19 @@ def by_filter(
# TODO: add some validation on entity_type

if not force and not soft and not dry_run:
message = (
"Hard deletion will permanently delete data from DataHub and can be slow. "
"We generally recommend using soft deletes instead. "
"Do you want to continue?"
)
if only_soft_deleted:
click.confirm(
"This will permanently delete data from DataHub. Do you want to continue?",
message,
abort=True,
)
else:
click.confirm(
"Hard deletion will permanently delete data from DataHub and can be slow. "
"We generally recommend using soft deletes instead. "
"Do you want to continue?",
message,
abort=True,
)

Expand Down Expand Up @@ -462,26 +470,64 @@ def by_filter(
abort=True,
)

urns_iter = urns
if not delete_by_urn and not dry_run:
urns_iter = progressbar.progressbar(urns, redirect_stdout=True)
_delete_urns_parallel(
graph=graph,
urns=urns,
aspect_name=aspect,
soft=soft,
dry_run=dry_run,
delete_by_urn=delete_by_urn,
start_time=start_time,
end_time=end_time,
workers=workers,
)


# Run the deletion.
def _delete_urns_parallel(
graph: DataHubGraph,
urns: List[str],
delete_by_urn: bool,
start_time: Optional[datetime],
end_time: Optional[datetime],
aspect_name: Optional[str] = None,
soft: bool = True,
dry_run: bool = False,
workers: int = 1,
) -> None:
deletion_result = DeletionResult()
with PerfTimer() as timer:
for urn in urns_iter:
one_result = _delete_one_urn(
graph=graph,
urn=urn,
aspect_name=aspect,
soft=soft,
dry_run=dry_run,
start_time=start_time,
end_time=end_time,

def process_urn(urn):
return _delete_one_urn(
graph=graph,
urn=urn,
aspect_name=aspect_name,
soft=soft,
dry_run=dry_run,
start_time=start_time,
end_time=end_time,
)

with PerfTimer() as timer, ThreadPoolExecutor(max_workers=workers) as executor:
future_to_urn = {executor.submit(process_urn, urn): urn for urn in urns}

completed_futures = as_completed(future_to_urn)
if not delete_by_urn and not dry_run:
futures_iter = progressbar.progressbar(
as_completed(future_to_urn),
max_value=len(future_to_urn),
redirect_stdout=True,
)
deletion_result.merge(one_result)
else:
futures_iter = completed_futures

for future in futures_iter:
try:
one_result = future.result()
deletion_result.merge(one_result)
except Exception as e:
urn = future_to_urn[future]
click.secho(f"Error processing URN {urn}: {e}", fg="red")

# Report out a summary of the deletion result.
click.echo(
deletion_result.format_message(
dry_run=dry_run, soft=soft, time_sec=timer.elapsed_seconds()
Expand Down

0 comments on commit 6db28b2

Please sign in to comment.