From 917ea60391a2cb3250f4118cae12fa5fe54a5520 Mon Sep 17 00:00:00 2001 From: Aidan McMahon-Smith Date: Mon, 29 Jan 2024 13:21:14 +0100 Subject: [PATCH] Add clearer retry logic with logging [RHELDST-9679] Poor retry logic in this library has made some pub tasks fail. The logs are also unclear as to whether retries are occurring or not. This change extends the urllib3 retry logic to include logging to resolve both issues. It also introduces new envvars to make the retry periods configurable. The default backoff settings retry the request 10 times over a 5 minute period. --- fastpurge/_client.py | 75 +++++++++++++++++++++++++++++++++++-------- test-requirements.txt | 1 + tests/test_purge.py | 46 +++++++++++++++++++------- 3 files changed, 97 insertions(+), 25 deletions(-) diff --git a/fastpurge/_client.py b/fastpurge/_client.py index daa91fe..4ed1ba6 100644 --- a/fastpurge/_client.py +++ b/fastpurge/_client.py @@ -6,6 +6,10 @@ from threading import local, Lock import requests +from requests.adapters import HTTPAdapter +from requests.exceptions import RetryError +from urllib3.util import Retry +from http import HTTPStatus try: from time import monotonic @@ -32,6 +36,29 @@ ]) +class LoggingRetry(Retry): + def __init__(self, *args, **kwargs, ): + self._logger = kwargs.pop('logger', None) + super(LoggingRetry, self).__init__(*args, **kwargs) + + def new(self, **kw): + kw['logger'] = self._logger + return super(LoggingRetry, self).new(**kw) + + def increment(self, method, url, *args, **kwargs): + response = kwargs.get("response") + if response: + self._logger.error("An invalid status code %s was received " + "when trying to %s to %s: %s", + response.status, method, url, response.reason) + else: # pragma: no cover + self._logger.error( + "An unknown error occurred when trying to %s to %s", method, + url) + return super(LoggingRetry, self).increment(method, url, *args, + **kwargs) + + class FastPurgeError(RuntimeError): """Raised when the Fast Purge API reports an error. @@ -74,6 +101,11 @@ class FastPurgeClient(object): # Default network matches Akamai's documented default DEFAULT_NETWORK = os.environ.get("FAST_PURGE_DEFAULT_NETWORK", "production") + # Max number of retries allowed for HTTP requests, and the backoff used + # to extend the delay between requests. + MAX_RETRIES = int(os.environ.get("FAST_PURGE_MAX_RETRIES", "10")) + + RETRY_BACKOFF = float(os.environ.get("FAST_PURGE_RETRY_BACKOFF", "0.15")) # Default purge type. # Akamai recommend "invalidate", so why is "delete" our default? # Here's what Akamai docs have to say: @@ -197,12 +229,32 @@ def __baseurl(self): return '{out}:{port}'.format(out=out, port=self.__port) + @property + def __retry_policy(self): + retries = getattr(self.__local, 'retries', None) + if not retries: + retries = LoggingRetry( + total=self.MAX_RETRIES, + backoff_factor=self.RETRY_BACKOFF, + # We strictly require 201 here since that's how the server + # tells us we queued something async, as expected + status_forcelist=[status.value for status in HTTPStatus + if status.value != 201], + allowed_methods={'POST'}, + logger=LOG, + ) + self.__local.retries = retries + return retries + @property def __session(self): session = getattr(self.__local, 'session', None) if not session: session = requests.Session() session.auth = EdgeGridAuth(**self.__auth) + session.mount(self.__baseurl, + HTTPAdapter(max_retries=self.__retry_policy)) + self.__local.session = session return session @@ -223,21 +275,16 @@ def __get_request_bodies(self, objects): def __start_purge(self, endpoint, request_body): headers = {'Content-Type': 'application/json'} LOG.debug("POST JSON of size %s to %s", len(request_body), endpoint) - - response = self.__session.post(endpoint, data=request_body, headers=headers) - - # Did it succeed? We strictly require 201 here since that's how the server tells - # us we queued something async, as expected - if response.status_code != 201: - message = "Request to {endpoint} failed: {r.status_code} {r.reason} {text}".\ - format(endpoint=endpoint, r=response, text=response.text[0:800]) + try: + response = self.__session.post(endpoint, data=request_body, headers=headers) + response_body = response.json() + estimated_seconds = response_body.get('estimatedSeconds', 5) + return Purge(response_body, monotonic() + estimated_seconds) + except RetryError as e: + message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}". \ + format(endpoint=endpoint, retries=self.MAX_RETRIES, reason=e.args[0].reason) LOG.debug("%s", message) - raise FastPurgeError(message) - - response_body = response.json() - estimated_seconds = response_body.get('estimatedSeconds', 5) - - return Purge(response_body, monotonic() + estimated_seconds) + raise FastPurgeError(message) from e def purge_objects(self, object_type, objects, **kwargs): """Purge a collection of objects. diff --git a/test-requirements.txt b/test-requirements.txt index 80f895c..9d8c11c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,3 +2,4 @@ pytest requests-mock mock bandit==1.7.5;python_version > '3' +responses \ No newline at end of file diff --git a/tests/test_purge.py b/tests/test_purge.py index fbd5cd7..ce647a7 100644 --- a/tests/test_purge.py +++ b/tests/test_purge.py @@ -1,6 +1,7 @@ import pytest import requests_mock import mock +import responses try: from time import monotonic @@ -37,7 +38,7 @@ def requests_mocker(): @pytest.fixture -def no_retries(): +def no_thread_retries(): """Suppress retries for the duration of this fixture.""" with mock.patch('more_executors.retry.ExceptionRetryPolicy') as policy_class: @@ -131,20 +132,21 @@ def test_scheme_port(client_auth, requests_mocker): assert future.result() -def test_response_fails(client, requests_mocker, no_retries): +@responses.activate +def test_response_fails(client, no_thread_retries): """Requests fail with a FastPurgeError if API gives unsuccessful response.""" + url = 'https://fastpurge.example.com/ccu/v3/delete/cpcode/production' + # Decrease backoff, otherwise the test will run for 5 minutes + with pytest.MonkeyPatch.context() as mp: + mp.setenv("FAST_PURGE_RETRY_BACKOFF", "0.001") - requests_mocker.register_uri( - method='POST', - url='https://fastpurge.example.com/ccu/v3/delete/cpcode/production', - status_code=503, - reason='simulated internal error') - - future = client.purge_by_cpcode([1234, 5678]) - exception = future.exception() + responses.add(responses.POST, url, status=503, + content_type="application/json", body="Error") + future = client.purge_by_cpcode([1234, 5678]) + exception = future.exception() assert isinstance(exception, FastPurgeError) - assert '503 simulated internal error' in str(exception) + assert 'too many 503 error responses' in str(exception) def test_split_requests(client, requests_mocker): @@ -201,3 +203,25 @@ def test_multiple_clients_with_the_same_auth_dict(client_auth): client2 = FastPurgeClient(auth=client_auth) assert client1 is not client2 + + +@responses.activate(registry=responses.registries.OrderedRegistry) +def test_retries_on_error(client_auth): + """Sanity check for the retry functionality""" + url = 'http://fastpurge.example.com:42/ccu/v3/delete/tag/staging' + err_1 = responses.add(responses.POST, url, status=500, + content_type="application/json", body="Error") + err_2 = responses.add(responses.POST, url, status=501, + content_type="application/json", body="Error") + res = responses.add(responses.POST, url, status=201, + content_type="application/json", + json={'estimatedSeconds': 0.1}) + + client = FastPurgeClient(auth=client_auth, scheme='http', port=42) + + future = client.purge_by_tag(['red'], network='staging') + + assert future.result() + assert len(err_1.calls) == 1 + assert len(err_2.calls) == 1 + assert len(res.calls) == 1