Skip to content

Commit

Permalink
Merge pull request #124 from nansencenter/download_failures
Browse files Browse the repository at this point in the history
Manage errors for HTTP downloads
  • Loading branch information
aperrin66 authored Dec 10, 2024
2 parents ba84200 + ab8b05a commit 198fec6
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 59 deletions.
120 changes: 69 additions & 51 deletions geospaas_processing/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class DownloadError(Exception):
"""Download failed"""


class RetriableDownloadError(Exception):
"""Download failed but might work if retried"""


class ObsoleteURLError(DownloadError):
"""The URL no longer points to a downloadable dataset"""

Expand Down Expand Up @@ -346,6 +350,8 @@ def get_file_name(cls, url, connection, **kwargs):
and url_file_name.endswith('.nc')):
return url_file_name

LOGGER.error("Could not find file name from HTTP response for %s: %s, %s, %s",
url, connection.status_code, connection.reason, connection.headers)
return ''

@classmethod
Expand All @@ -365,6 +371,8 @@ def connect(cls, url, auth=(None, None), **kwargs):
raise DownloadError(
f"Could not download from '{url}'; response: {details}"
) from error
except (requests.ConnectionError, requests.Timeout) as error:
raise RetriableDownloadError(f"Failed to connect to {url}") from error
except requests.RequestException as error:
raise DownloadError(
f"Could not download from '{url}'"
Expand Down Expand Up @@ -399,14 +407,13 @@ def download_file(cls, file, url, connection):
`connection` argument
"""
chunk = None
for chunk in connection.iter_content(chunk_size=cls.CHUNK_SIZE):
file.write(chunk)
else:
# This executes after the loop and raises an error if the
# response is unexpectedly empty like it sometimes happens
# with scihub
if chunk is None:
raise DownloadError(f"Getting an empty file from '{url}'")
try:
for chunk in connection.iter_content(chunk_size=cls.CHUNK_SIZE):
file.write(chunk)
except requests.exceptions.ChunkedEncodingError as error:
raise RetriableDownloadError(f"Download from {url} was interrupted") from error
if chunk is None:
raise DownloadError(f"Getting an empty file from '{url}'")


class FTPDownloader(Downloader):
Expand Down Expand Up @@ -605,6 +612,51 @@ def already_downloaded(cls, dataset_directory):
return True
return False

def _download_from_uri(self, dataset_uri, directory):
"""Download the file(s) from `dataset_uri` to `directory`"""
# Get the extra settings for the provider
dataset_uri_prefix = "://".join(requests.utils.urlparse(dataset_uri.uri)[0:2])
# Find provider settings
extra_settings = self.get_provider_settings(dataset_uri_prefix)
if extra_settings:
LOGGER.debug("Loaded extra settings for provider %s: %s",
dataset_uri_prefix, extra_settings)
# Launch download if the maximum number of parallel downloads has not been reached
with DownloadLock(dataset_uri_prefix,
extra_settings.get('max_parallel_downloads'),
utils.REDIS_HOST, utils.REDIS_PORT) as acquired:
if not acquired:
raise TooManyDownloadsError(
f"Too many downloads in progress for {dataset_uri_prefix}")
# Try to find a downloader
try:
downloader = self.DOWNLOADERS[dataset_uri.service]
except KeyError:
LOGGER.error("No downloader found for %s service",
dataset_uri.service, exc_info=True)
raise

LOGGER.debug("Attempting to download from '%s'", dataset_uri.uri)
file_name = None
download_error = None
try:
file_name = downloader.check_and_download_url(
url=dataset_uri.uri, download_dir=directory,
**extra_settings)
except DownloadError as error:
LOGGER.warning(
("Failed to download dataset %s from %s. "
"Another URL will be tried if possible"),
dataset_uri.dataset.pk, dataset_uri.uri, exc_info=True)
download_error = error
shutil.rmtree(directory, ignore_errors=True)
except (FileNotFoundError, IsADirectoryError) as error:
shutil.rmtree(directory, ignore_errors=True)
raise DownloadError(
f"Could not write the downloaded file to {error.filename}") from error

return file_name, download_error

def download_dataset(self, dataset, download_directory):
"""
Attempt to download a dataset by trying its URIs one by one. For each `DatasetURI`, it
Expand All @@ -625,49 +677,15 @@ def download_dataset(self, dataset, download_directory):
else:
os.makedirs(full_dataset_directory, exist_ok=True)
for dataset_uri in dataset.dataseturi_set.all():
# Get the extra settings for the provider
dataset_uri_prefix = "://".join(requests.utils.urlparse(dataset_uri.uri)[0:2])
# Find provider settings
extra_settings = self.get_provider_settings(dataset_uri_prefix)
if extra_settings:
LOGGER.debug("Loaded extra settings for provider %s: %s",
dataset_uri_prefix, extra_settings)
# Launch download if the maximum number of parallel downloads has not been reached
with DownloadLock(dataset_uri_prefix,
extra_settings.get('max_parallel_downloads'),
utils.REDIS_HOST, utils.REDIS_PORT) as acquired:
if not acquired:
raise TooManyDownloadsError(
f"Too many downloads in progress for {dataset_uri_prefix}")
# Try to find a downloader
try:
downloader = self.DOWNLOADERS[dataset_uri.service]
except KeyError:
LOGGER.error("No downloader found for %s service",
dataset_uri.service, exc_info=True)
raise

LOGGER.debug("Attempting to download from '%s'", dataset_uri.uri)
try:
file_name = downloader.check_and_download_url(
url=dataset_uri.uri, download_dir=full_dataset_directory,
**extra_settings)
except DownloadError as error:
LOGGER.warning(
("Failed to download dataset %s from %s. "
"Another URL will be tried if possible"),
dataset.pk, dataset_uri.uri, exc_info=True)
errors.append(error)
shutil.rmtree(full_dataset_directory, ignore_errors=True)
except (FileNotFoundError, IsADirectoryError) as error:
shutil.rmtree(full_dataset_directory, ignore_errors=True)
raise DownloadError(
f"Could not write the downloaded file to {error.filename}") from error
else:
dataset_path = os.path.join(dataset_directory, file_name)
LOGGER.info("Successfully downloaded dataset %d to %s",
dataset.pk, dataset_path)
break
file_name, download_error = self._download_from_uri(dataset_uri,
full_dataset_directory)
if file_name:
dataset_path = os.path.join(dataset_directory, file_name)
LOGGER.info("Successfully downloaded dataset %d to %s",
dataset_uri.dataset.pk, dataset_path)
break
if download_error:
errors.append(download_error)

if file_name:
if self.save_path:
Expand Down
4 changes: 2 additions & 2 deletions geospaas_processing/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
FaultTolerantTask,
WORKING_DIRECTORY,
PROVIDER_SETTINGS_PATH)
from ..downloaders import DownloadManager, TooManyDownloadsError
from ..downloaders import DownloadManager, RetriableDownloadError, TooManyDownloadsError

from . import app, DATASET_LOCK_PREFIX

Expand All @@ -38,7 +38,7 @@ def download(self, args):
except IndexError:
logger.error("Nothing was downloaded for dataset %s", dataset_id, exc_info=True)
raise
except TooManyDownloadsError:
except (TooManyDownloadsError, RetriableDownloadError):
# Stop retrying after 24 hours
self.retry((args,), countdown=90, max_retries=960)
except OSError as error:
Expand Down
31 changes: 25 additions & 6 deletions tests/test_downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ def test_build_oauth2_authentication_unknown_placement(self):
'username', 'password', 'token_url', 'client_id',
token_placement='foo', token_parameter_name='token')


def test_get_oauth2_auth_no_totp(self):
"""Test getting an OAuth2 authentication from get_auth()"""
mock_auth = mock.Mock()
Expand Down Expand Up @@ -446,7 +445,8 @@ def test_get_file_name_no_header(self):
"""
response = requests.Response()
response.status_code = 200
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')
with self.assertLogs(level=logging.ERROR):
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')

def test_get_file_name_no_filename_in_header(self):
"""`get_file_name()` must return an empty string if the
Expand All @@ -455,7 +455,8 @@ def test_get_file_name_no_filename_in_header(self):
response = requests.Response()
response.status_code = 202
response.headers['Content-Disposition'] = ''
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')
with self.assertLogs(level=logging.ERROR):
self.assertEqual(downloaders.HTTPDownloader.get_file_name('url', response), '')

def test_get_file_name_multiple_possibilities(self):
"""An error must be raised if several file names are found in the header"""
Expand Down Expand Up @@ -497,12 +498,22 @@ def test_connect_error_code(self):
downloaders.HTTPDownloader.connect('url')
self.assertIsInstance(error.exception.__cause__, requests.HTTPError)

def test_connect_request_exception(self):
"""An exception must be raised if an error prevents the
connection from happening

def test_connect_connection_exception(self):
"""A RetriableDownloadError must be raised if an error prevents
the connection from happening
"""
with mock.patch('geospaas_processing.utils.http_request',
side_effect=requests.ConnectionError):
with self.assertRaises(downloaders.RetriableDownloadError):
downloaders.HTTPDownloader.connect('url')

def test_connect_request_exception(self):
"""An DownloadError must be raised if an other error prevents
the connection from happening
"""
with mock.patch('geospaas_processing.utils.http_request',
side_effect=requests.TooManyRedirects):
with self.assertRaises(downloaders.DownloadError):
downloaders.HTTPDownloader.connect('url')

Expand Down Expand Up @@ -554,6 +565,14 @@ def test_download_empty_file(self):
with self.assertRaises(downloaders.DownloadError):
downloaders.HTTPDownloader.download_file(mock.Mock(), 'url', response)

def test_download_interrupted_connection(self):
"""An exception must be raised if the connection is interrupted
"""
response = mock.Mock()
response.iter_content.side_effect = requests.exceptions.ChunkedEncodingError
with self.assertRaises(downloaders.RetriableDownloadError):
downloaders.HTTPDownloader.download_file(mock.Mock(), 'url', response)


class URLOAuth2TestCase(unittest.TestCase):
"""Tests for the URLOAuth2 class"""
Expand Down

0 comments on commit 198fec6

Please sign in to comment.