Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add --workers arg in delete command #12102

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from random import choices
Expand Down Expand Up @@ -345,6 +346,9 @@ def undo_by_filter(
default=False,
help="Only delete soft-deleted entities, for hard deletion",
)
@click.option(
"--workers", type=int, default=1, help="Num of workers to use for deletion."
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def by_filter(
Expand All @@ -362,6 +366,7 @@ def by_filter(
batch_size: int,
dry_run: bool,
only_soft_deleted: bool,
workers: int = 1,
) -> None:
"""Delete metadata from datahub using a single urn or a combination of filters."""

Expand All @@ -382,16 +387,19 @@ def by_filter(
# TODO: add some validation on entity_type

if not force and not soft and not dry_run:
message = (
"Hard deletion will permanently delete data from DataHub and can be slow. "
"We generally recommend using soft deletes instead. "
"Do you want to continue?"
)
if only_soft_deleted:
click.confirm(
"This will permanently delete data from DataHub. Do you want to continue?",
message,
abort=True,
)
else:
click.confirm(
"Hard deletion will permanently delete data from DataHub and can be slow. "
"We generally recommend using soft deletes instead. "
"Do you want to continue?",
message,
abort=True,
)

Expand Down Expand Up @@ -462,26 +470,64 @@ def by_filter(
abort=True,
)

urns_iter = urns
if not delete_by_urn and not dry_run:
urns_iter = progressbar.progressbar(urns, redirect_stdout=True)
_delete_urns_parallel(
graph=graph,
urns=urns,
aspect_name=aspect,
soft=soft,
dry_run=dry_run,
delete_by_urn=delete_by_urn,
start_time=start_time,
end_time=end_time,
workers=workers,
)


# Run the deletion.
def _delete_urns_parallel(
graph: DataHubGraph,
urns: List[str],
delete_by_urn: bool,
start_time: Optional[datetime],
end_time: Optional[datetime],
aspect_name: Optional[str] = None,
soft: bool = True,
dry_run: bool = False,
workers: int = 1,
) -> None:
deletion_result = DeletionResult()
with PerfTimer() as timer:
for urn in urns_iter:
one_result = _delete_one_urn(
graph=graph,
urn=urn,
aspect_name=aspect,
soft=soft,
dry_run=dry_run,
start_time=start_time,
end_time=end_time,

def process_urn(urn):
return _delete_one_urn(
graph=graph,
urn=urn,
aspect_name=aspect_name,
soft=soft,
dry_run=dry_run,
start_time=start_time,
end_time=end_time,
)

with PerfTimer() as timer, ThreadPoolExecutor(max_workers=workers) as executor:
future_to_urn = {executor.submit(process_urn, urn): urn for urn in urns}

completed_futures = as_completed(future_to_urn)
if not delete_by_urn and not dry_run:
futures_iter = progressbar.progressbar(
as_completed(future_to_urn),
max_value=len(future_to_urn),
redirect_stdout=True,
)
deletion_result.merge(one_result)
else:
futures_iter = completed_futures

for future in futures_iter:
try:
one_result = future.result()
deletion_result.merge(one_result)
except Exception as e:
urn = future_to_urn[future]
click.secho(f"Error processing URN {urn}: {e}", fg="red")

# Report out a summary of the deletion result.
click.echo(
deletion_result.format_message(
dry_run=dry_run, soft=soft, time_sec=timer.elapsed_seconds()
Expand Down
Loading