Skip to content

Commit

Permalink
laxydl: Add retries to downloads.
Browse files Browse the repository at this point in the history
Call sync after download is complete.
Try to catch incomplete downloads.
  • Loading branch information
pansapiens committed Oct 22, 2024
1 parent 7a03dff commit 25e8f08
Showing 1 changed file with 51 additions and 39 deletions.
90 changes: 51 additions & 39 deletions laxy_downloader/laxy_downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import asks
from attrdict import AttrDict
from filelock import FileLock
from contextlib import contextmanager

logger = logging.getLogger(__name__)
# logging.basicConfig(format='%(levelname)s: %(asctime)s -- %(message)s', level=logging.INFO)
Expand Down Expand Up @@ -158,6 +159,18 @@ def request_with_retries(*args, **kwargs):
return response


@contextmanager
def temporary_file(mode="wb", dir=None, prefix=None, suffix=None):
tmp = tempfile.NamedTemporaryFile(
mode=mode, dir=dir, prefix=prefix, suffix=suffix, delete=False
)
try:
yield tmp
finally:
tmp.close()
os.unlink(tmp.name)


def download_url(
url: str,
filepath: Union[str, Path],
Expand All @@ -169,6 +182,7 @@ def download_url(
check_existing_size: bool = True,
remove_existing: bool = True,
chunk_size: int = 1024,
max_retries: int = 3,
) -> str:
"""
Download a file from a given URL to a specified filepath, with support for both HTTP(S) and FTP protocols.
Expand Down Expand Up @@ -209,10 +223,8 @@ def download_url(

scheme = urlparse(url).scheme

tmpfilepath = None
status_code = None
lock_path = f"{filepath}.lock"
lock = FileLock(lock_path, timeout=600) # 10 minutes timeout
lock = FileLock(lock_path, timeout=60 * 60 * 12) # 12 hours timeout

try:
with lock:
Expand All @@ -223,52 +235,52 @@ def download_url(
content_length
):
logger.info(
f"File of correct size {filepath} ({content_length} bytes) already "
"exists, skipping download"
f"File of correct size {filepath} ({content_length} bytes) already exists, skipping download"
)
return filepath
elif remove_existing:
logger.info(
f"File exists {filepath} but is incorrect size "
f"({content_length} bytes). Deleting existing file."
f"File exists {filepath} but is incorrect size ({content_length} bytes). Deleting existing file."
)
os.remove(filepath)

logger.info(f"Starting download: {url} ({url_to_cache_key(url)})")
with tempfile.NamedTemporaryFile(
mode="wb",
dir=directory,
prefix=f"{filename}.",
suffix=".tmp",
delete=False,
) as tmpfile:
tmpfilepath = str(Path(directory, tmpfile.name))
if scheme == "ftp":
urllib.request.urlretrieve(url, filename=tmpfilepath)
else:
with closing(
request_with_retries(
"GET", url, stream=True, headers=headers, auth=auth
)
) as download:
status_code = download.status_code
for chunk in download.iter_content(chunk_size=chunk_size):
tmpfile.write(chunk)
tmpfile.flush()

file_size = os.path.getsize(tmpfilepath)
if content_length is not None and file_size < int(content_length):
raise Exception(
f"Downloaded file size ({file_size}) appears incomplete based on "
f"Content-Length header ({content_length})."
)
for attempt in range(max_retries):
logger.info(
f"Starting download (attempt {attempt + 1}/{max_retries}): {url} ({url_to_cache_key(url)})"
)
with temporary_file(
mode="wb", dir=directory, prefix=f"{filename}.", suffix=".tmp"
) as tmpfile:
if scheme == "ftp":
urllib.request.urlretrieve(url, filename=tmpfile.name)
else:
with closing(
request_with_retries(
"GET", url, stream=True, headers=headers, auth=auth
)
) as download:
download.raise_for_status()
for chunk in download.iter_content(chunk_size=chunk_size):
tmpfile.write(chunk)

shutil.move(tmpfilepath, filepath)
tmpfile.flush()
os.fsync(tmpfile.fileno())

file_size = os.path.getsize(tmpfile.name)
if content_length is not None and file_size == int(content_length):
shutil.move(tmpfile.name, filepath)
return filepath
elif attempt < max_retries - 1:
logger.warning(
f"Downloaded file size ({file_size}) does not match Content-Length ({content_length}). Retrying..."
)
else:
raise Exception(
f"Downloaded file size ({file_size}) does not match Content-Length ({content_length}) after {max_retries} attempts."
)

except Exception as e:
handle_download_exception(
e, status_code, url, cleanup_on_exception, tmpfilepath
)
handle_download_exception(e, None, url, cleanup_on_exception, None)
finally:
try:
os.remove(lock_path)
Expand Down

0 comments on commit 25e8f08

Please sign in to comment.