Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracks progress for package creation, upload and kickoff #2935

Merged
merged 17 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,9 @@ def get_level_from_cli_verbosity(verbosity: int) -> int:
return logging.DEBUG


def is_display_progress_enabled() -> bool:
return os.getenv(LOGGING_RICH_FMT_ENV_VAR, False)


# Default initialization
initialize_global_loggers()
20 changes: 19 additions & 1 deletion flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import requests
from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest
from flyteidl.core import literals_pb2
from rich.progress import Progress, TextColumn, TimeElapsedColumn

from flytekit import ImageSpec
from flytekit.clients.friendly import SynchronousFlyteClient
Expand Down Expand Up @@ -66,7 +67,7 @@
FlyteEntityNotExistException,
FlyteValueException,
)
from flytekit.loggers import developer_logger, logger
from flytekit.loggers import developer_logger, is_display_progress_enabled, logger
from flytekit.models import common as common_models
from flytekit.models import filters as filter_models
from flytekit.models import launch_plan as launch_plan_models
Expand Down Expand Up @@ -1168,6 +1169,13 @@ def upload_file(
encoded_md5 = b64encode(md5_bytes)
local_file_path = str(to_upload)
content_length = os.stat(local_file_path).st_size

upload_package_progress = Progress(TimeElapsedColumn(), TextColumn("[progress.description]{task.description}"))
t1 = upload_package_progress.add_task(f"Uploading package of size {content_length/1024/1024:.2f} MBs", total=1)
upload_package_progress.start_task(t1)
if is_display_progress_enabled():
upload_package_progress.start()

with open(local_file_path, "+rb") as local_file:
headers = {"Content-Length": str(content_length), "Content-MD5": encoded_md5}
headers.update(extra_headers)
Expand All @@ -1187,6 +1195,16 @@ def upload_file(
f"Request to send data {upload_location.signed_url} failed.\nResponse: {rsp.text}",
)

upload_package_progress.update(
t1,
completed=1,
description=f"Uploaded package of size {content_length/1024/1024:.2f}MB",
refresh=True,
)
upload_package_progress.stop_task(t1)
if is_display_progress_enabled():
upload_package_progress.stop()

developer_logger.debug(
f"Uploading {to_upload} to {upload_location.signed_url} native url {upload_location.native_url}"
)
Expand Down
66 changes: 64 additions & 2 deletions flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

import click
from rich import print as rich_print
from rich.progress import (
BarColumn,
Progress,
TextColumn,
TimeElapsedColumn,
)
from rich.tree import Tree

from flytekit.constants import CopyFileDetection
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.python_auto_container import PICKLE_FILE_PATH
from flytekit.core.utils import timeit
from flytekit.exceptions.user import FlyteDataNotFoundException
from flytekit.loggers import logger
from flytekit.loggers import is_display_progress_enabled, logger
from flytekit.tools.ignore import DockerIgnore, FlyteIgnore, GitIgnore, Ignore, IgnoreGroup, StandardIgnore
from flytekit.tools.script_mode import _filehash_update, _pathhash_update, ls_files, tar_strip_file_attributes

Expand Down Expand Up @@ -120,6 +126,18 @@ def fast_package(
if options and (
options.copy_style == CopyFileDetection.LOADED_MODULES or options.copy_style == CopyFileDetection.ALL
):
create_tarball_progress = Progress(
TimeElapsedColumn(),
TextColumn("[progress.description]{task.description}."),
BarColumn(),
TextColumn("{task.fields[files_added_progress]}"),
)

compress_tarball_progress = Progress(
TimeElapsedColumn(),
TextColumn("[progress.description]{task.description}"),
)

ls, ls_digest = ls_files(str(source), options.copy_style, deref_symlinks, ignore)
logger.debug(f"Hash digest: {ls_digest}")

Expand All @@ -130,13 +148,30 @@ def fast_package(
archive_fname = f"{FAST_PREFIX}{ls_digest}{FAST_FILEENDING}"
if output_dir is None:
output_dir = tempfile.mkdtemp()
click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow")
click.secho(
f"No output path provided, using a temporary directory at {output_dir} instead",
fg="yellow",
)
archive_fname = os.path.join(output_dir, archive_fname)

# add the tarfile task to progress and start it
total_files = len(ls)
files_processed = 0
tar_task = create_tarball_progress.add_task(
f"Creating tarball with [{total_files}] files...",
total=total_files,
files_added_progress=f"{files_processed}/{total_files} files",
)

if is_display_progress_enabled():
create_tarball_progress.start()

create_tarball_progress.start_task(tar_task)
with tempfile.TemporaryDirectory() as tmp_dir:
tar_path = os.path.join(tmp_dir, "tmp.tar")
with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar:
for ws_file in ls:
files_processed = files_processed + 1
rel_path = os.path.relpath(ws_file, start=source)
tar.add(
os.path.join(source, ws_file),
Expand All @@ -145,7 +180,34 @@ def fast_package(
filter=lambda x: tar_strip_file_attributes(x),
)

create_tarball_progress.update(
tar_task,
advance=1,
description=f"Added file {rel_path}",
refresh=True,
files_added_progress=f"{files_processed}/{total_files} files",
)

create_tarball_progress.stop_task(tar_task)
if is_display_progress_enabled():
create_tarball_progress.stop()
compress_tarball_progress.start()

tpath = pathlib.Path(tar_path)
size_mbs = tpath.stat().st_size / 1024 / 1024
compress_task = compress_tarball_progress.add_task(f"Compressing tarball size {size_mbs:.2f}MB...", total=1)
compress_tarball_progress.start_task(compress_task)
compress_tarball(tar_path, archive_fname)
arpath = pathlib.Path(archive_fname)
asize_mbs = arpath.stat().st_size / 1024 / 1024
compress_tarball_progress.update(
compress_task,
advance=1,
description=f"Tarball {size_mbs:.2f}MB compressed to {asize_mbs:.2f}MB",
)
compress_tarball_progress.stop_task(compress_task)
if is_display_progress_enabled():
compress_tarball_progress.stop()

# Original tar command - This condition to be removed in the future after serialize is removed.
else:
Expand Down
Loading