Skip to content

Commit

Permalink
Add clearer retry logic with logging [RHELDST-9679]
Browse files Browse the repository at this point in the history
Poor retry logic in this library has made some pub tasks fail. The logs
are also unclear as to whether retries are occurring or not. This change
extends the urllib3 retry logic to include logging to resolve both
issues. It also introduces new envvars to make the retry periods
configurable. The default backoff settings retry the request 10 times
over a 5 minute period.
  • Loading branch information
amcmahon-rh committed Jan 29, 2024
1 parent 0471367 commit f282166
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 24 deletions.
75 changes: 61 additions & 14 deletions fastpurge/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from threading import local, Lock

import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from urllib3.util import Retry
from http import HTTPStatus

try:
from time import monotonic
Expand All @@ -32,6 +36,29 @@
])


class LoggingRetry(Retry):
def __init__(self, *args, **kwargs, ):
self._logger = kwargs.pop('logger', None)
super(LoggingRetry, self).__init__(*args, **kwargs)

def new(self, **kw):
kw['logger'] = self._logger
return super(LoggingRetry, self).new(**kw)

def increment(self, method, url, *args, **kwargs):
response = kwargs.get("response")
if response:
self._logger.error("An invalid status code %s was received "
"when trying to %s to %s: %s"
% (response.status, method, url, response.reason))
else: # pragma: no cover
self._logger.error(
"An unknown error occurred when trying to %s to %s"
% (method, url))
return super(LoggingRetry, self).increment(method, url, *args,
**kwargs)


class FastPurgeError(RuntimeError):
"""Raised when the Fast Purge API reports an error.
Expand Down Expand Up @@ -74,6 +101,11 @@ class FastPurgeClient(object):
# Default network matches Akamai's documented default
DEFAULT_NETWORK = os.environ.get("FAST_PURGE_DEFAULT_NETWORK", "production")

# Max number of retries allowed for HTTP requests, and the backoff used
# to extend the delay between requests.
MAX_RETRIES = int(os.environ.get("FAST_PURGE_MAX_RETRIES", "10"))

RETRY_BACKOFF = float(os.environ.get("FAST_PURGE_RETRY_BACKOFF", "0.15"))
# Default purge type.
# Akamai recommend "invalidate", so why is "delete" our default?
# Here's what Akamai docs have to say:
Expand Down Expand Up @@ -197,12 +229,32 @@ def __baseurl(self):

return '{out}:{port}'.format(out=out, port=self.__port)

@property
def __retry_policy(self):
retries = getattr(self.__local, 'retries', None)
if not retries:
retries = LoggingRetry(
total=self.MAX_RETRIES,
backoff_factor=self.RETRY_BACKOFF,
# We strictly require 201 here since that's how the server
# tells us we queued something async, as expected
status_forcelist=[status.value for status in HTTPStatus
if status.value != 201],
allowed_methods={'POST'},
logger=LOG,
)
self.__local.retries = retries
return retries

@property
def __session(self):
session = getattr(self.__local, 'session', None)
if not session:
session = requests.Session()
session.auth = EdgeGridAuth(**self.__auth)
session.mount(self.__baseurl,
HTTPAdapter(max_retries=self.__retry_policy))

self.__local.session = session
return session

Expand All @@ -223,22 +275,17 @@ def __get_request_bodies(self, objects):
def __start_purge(self, endpoint, request_body):
headers = {'Content-Type': 'application/json'}
LOG.debug("POST JSON of size %s to %s", len(request_body), endpoint)

response = self.__session.post(endpoint, data=request_body, headers=headers)

# Did it succeed? We strictly require 201 here since that's how the server tells
# us we queued something async, as expected
if response.status_code != 201:
message = "Request to {endpoint} failed: {r.status_code} {r.reason} {text}".\
format(endpoint=endpoint, r=response, text=response.text[0:800])
LOG.debug("%s", message)
try:
response = self.__session.post(endpoint, data=request_body, headers=headers)
response_body = response.json()
estimated_seconds = response_body.get('estimatedSeconds', 5)
return Purge(response_body, monotonic() + estimated_seconds)
except RetryError as e:
message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}". \
format(endpoint=endpoint, retries=self.MAX_RETRIES, reason=e.args[0].reason)
LOG.debug(message)
raise FastPurgeError(message)

response_body = response.json()
estimated_seconds = response_body.get('estimatedSeconds', 5)

return Purge(response_body, monotonic() + estimated_seconds)

def purge_objects(self, object_type, objects, **kwargs):
"""Purge a collection of objects.
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pytest
requests-mock
mock
bandit==1.7.5;python_version > '3'
responses
43 changes: 33 additions & 10 deletions tests/test_purge.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
import requests_mock
import mock
import responses
import os

try:
from time import monotonic
Expand Down Expand Up @@ -37,7 +39,7 @@ def requests_mocker():


@pytest.fixture
def no_retries():
def no_thread_retries():
"""Suppress retries for the duration of this fixture."""

with mock.patch('more_executors.retry.ExceptionRetryPolicy') as policy_class:
Expand Down Expand Up @@ -131,20 +133,19 @@ def test_scheme_port(client_auth, requests_mocker):
assert future.result()


def test_response_fails(client, requests_mocker, no_retries):
@responses.activate
def test_response_fails(client, no_thread_retries):
"""Requests fail with a FastPurgeError if API gives unsuccessful response."""

requests_mocker.register_uri(
method='POST',
url='https://fastpurge.example.com/ccu/v3/delete/cpcode/production',
status_code=503,
reason='simulated internal error')

url = 'https://fastpurge.example.com/ccu/v3/delete/cpcode/production'
# Decrease backoff, otherwise the test will run for 5 minutes
os.environ["FAST_PURGE_RETRY_BACKOFF"] = "0.001"
responses.add(responses.POST, url, status=503,
content_type="application/json", body="Error")
future = client.purge_by_cpcode([1234, 5678])
exception = future.exception()

assert isinstance(exception, FastPurgeError)
assert '503 simulated internal error' in str(exception)
assert 'too many 503 error responses' in str(exception)


def test_split_requests(client, requests_mocker):
Expand Down Expand Up @@ -201,3 +202,25 @@ def test_multiple_clients_with_the_same_auth_dict(client_auth):
client2 = FastPurgeClient(auth=client_auth)

assert client1 is not client2


@responses.activate(registry=responses.registries.OrderedRegistry)
def test_retries_on_error(client_auth):
"""Sanity check for the retry functionality"""
url = 'http://fastpurge.example.com:42/ccu/v3/delete/tag/staging'
err_1 = responses.add(responses.POST, url, status=500,
content_type="application/json", body="Error")
err_2 = responses.add(responses.POST, url, status=501,
content_type="application/json", body="Error")
res = responses.add(responses.POST, url, status=201,
content_type="application/json",
json={'estimatedSeconds': 0.1})

client = FastPurgeClient(auth=client_auth, scheme='http', port=42)

future = client.purge_by_tag(['red'], network='staging')

assert future.result()
assert len(err_1.calls) == 1
assert len(err_2.calls) == 1
assert len(res.calls) == 1

0 comments on commit f282166

Please sign in to comment.