Skip to content

Commit d2b47e8

Browse files
committed
Add clearer retry logic with logging [RHELDST-9679]
Poor retry logic in this library has made some pub tasks fail. The logs are also unclear as to whether retries are occurring or not. This change extends the urllib3 retry logic to include logging to resolve both issues. It also introduces new envvars to make the retry periods configurable. The default backoff settings retry the request 10 times over a 5 minute period.
1 parent 0471367 commit d2b47e8

File tree

3 files changed

+94
-23
lines changed

3 files changed

+94
-23
lines changed

fastpurge/_client.py

+61-14
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
from threading import local, Lock
77

88
import requests
9+
from requests.adapters import HTTPAdapter
10+
from requests.exceptions import RetryError
11+
from urllib3.util import Retry
12+
from http import HTTPStatus
913

1014
try:
1115
from time import monotonic
@@ -32,6 +36,29 @@
3236
])
3337

3438

39+
class LoggingRetry(Retry):
40+
def __init__(self, *args, **kwargs, ):
41+
self._logger = kwargs.pop('logger', None)
42+
super(LoggingRetry, self).__init__(*args, **kwargs)
43+
44+
def new(self, **kw):
45+
kw['logger'] = self._logger
46+
return super(LoggingRetry, self).new(**kw)
47+
48+
def increment(self, method, url, *args, **kwargs):
49+
response = kwargs.get("response")
50+
if response:
51+
self._logger.error("An invalid status code %s was received "
52+
"when trying to %s to %s: %s",
53+
response.status, method, url, response.reason)
54+
else: # pragma: no cover
55+
self._logger.error(
56+
"An unknown error occurred when trying to %s to %s", method,
57+
url)
58+
return super(LoggingRetry, self).increment(method, url, *args,
59+
**kwargs)
60+
61+
3562
class FastPurgeError(RuntimeError):
3663
"""Raised when the Fast Purge API reports an error.
3764
@@ -74,6 +101,11 @@ class FastPurgeClient(object):
74101
# Default network matches Akamai's documented default
75102
DEFAULT_NETWORK = os.environ.get("FAST_PURGE_DEFAULT_NETWORK", "production")
76103

104+
# Max number of retries allowed for HTTP requests, and the backoff used
105+
# to extend the delay between requests.
106+
MAX_RETRIES = int(os.environ.get("FAST_PURGE_MAX_RETRIES", "10"))
107+
108+
RETRY_BACKOFF = float(os.environ.get("FAST_PURGE_RETRY_BACKOFF", "0.15"))
77109
# Default purge type.
78110
# Akamai recommend "invalidate", so why is "delete" our default?
79111
# Here's what Akamai docs have to say:
@@ -197,12 +229,32 @@ def __baseurl(self):
197229

198230
return '{out}:{port}'.format(out=out, port=self.__port)
199231

232+
@property
233+
def __retry_policy(self):
234+
retries = getattr(self.__local, 'retries', None)
235+
if not retries:
236+
retries = LoggingRetry(
237+
total=self.MAX_RETRIES,
238+
backoff_factor=self.RETRY_BACKOFF,
239+
# We strictly require 201 here since that's how the server
240+
# tells us we queued something async, as expected
241+
status_forcelist=[status.value for status in HTTPStatus
242+
if status.value != 201],
243+
allowed_methods={'POST'},
244+
logger=LOG,
245+
)
246+
self.__local.retries = retries
247+
return retries
248+
200249
@property
201250
def __session(self):
202251
session = getattr(self.__local, 'session', None)
203252
if not session:
204253
session = requests.Session()
205254
session.auth = EdgeGridAuth(**self.__auth)
255+
session.mount(self.__baseurl,
256+
HTTPAdapter(max_retries=self.__retry_policy))
257+
206258
self.__local.session = session
207259
return session
208260

@@ -223,21 +275,16 @@ def __get_request_bodies(self, objects):
223275
def __start_purge(self, endpoint, request_body):
224276
headers = {'Content-Type': 'application/json'}
225277
LOG.debug("POST JSON of size %s to %s", len(request_body), endpoint)
226-
227-
response = self.__session.post(endpoint, data=request_body, headers=headers)
228-
229-
# Did it succeed? We strictly require 201 here since that's how the server tells
230-
# us we queued something async, as expected
231-
if response.status_code != 201:
232-
message = "Request to {endpoint} failed: {r.status_code} {r.reason} {text}".\
233-
format(endpoint=endpoint, r=response, text=response.text[0:800])
278+
try:
279+
response = self.__session.post(endpoint, data=request_body, headers=headers)
280+
response_body = response.json()
281+
estimated_seconds = response_body.get('estimatedSeconds', 5)
282+
return Purge(response_body, monotonic() + estimated_seconds)
283+
except RetryError as e:
284+
message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}". \
285+
format(endpoint=endpoint, retries=self.MAX_RETRIES, reason=e.args[0].reason)
234286
LOG.debug("%s", message)
235-
raise FastPurgeError(message)
236-
237-
response_body = response.json()
238-
estimated_seconds = response_body.get('estimatedSeconds', 5)
239-
240-
return Purge(response_body, monotonic() + estimated_seconds)
287+
raise FastPurgeError(message) from e
241288

242289
def purge_objects(self, object_type, objects, **kwargs):
243290
"""Purge a collection of objects.

test-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pytest
22
requests-mock
33
mock
44
bandit==1.7.5;python_version > '3'
5+
responses

tests/test_purge.py

+32-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import requests_mock
33
import mock
4+
import responses
45

56
try:
67
from time import monotonic
@@ -37,7 +38,7 @@ def requests_mocker():
3738

3839

3940
@pytest.fixture
40-
def no_retries():
41+
def no_thread_retries():
4142
"""Suppress retries for the duration of this fixture."""
4243

4344
with mock.patch('more_executors.retry.ExceptionRetryPolicy') as policy_class:
@@ -131,20 +132,20 @@ def test_scheme_port(client_auth, requests_mocker):
131132
assert future.result()
132133

133134

134-
def test_response_fails(client, requests_mocker, no_retries):
135+
@responses.activate
136+
def test_response_fails(client, no_thread_retries, monkeypatch):
135137
"""Requests fail with a FastPurgeError if API gives unsuccessful response."""
138+
url = 'https://fastpurge.example.com/ccu/v3/delete/cpcode/production'
139+
# Decrease backoff, otherwise the test will run for 5 minutes
140+
monkeypatch.setenv("FAST_PURGE_RETRY_BACKOFF", "0.001")
136141

137-
requests_mocker.register_uri(
138-
method='POST',
139-
url='https://fastpurge.example.com/ccu/v3/delete/cpcode/production',
140-
status_code=503,
141-
reason='simulated internal error')
142-
142+
responses.add(responses.POST, url, status=503,
143+
content_type="application/json", body="Error")
143144
future = client.purge_by_cpcode([1234, 5678])
144145
exception = future.exception()
145146

146147
assert isinstance(exception, FastPurgeError)
147-
assert '503 simulated internal error' in str(exception)
148+
assert 'too many 503 error responses' in str(exception)
148149

149150

150151
def test_split_requests(client, requests_mocker):
@@ -201,3 +202,25 @@ def test_multiple_clients_with_the_same_auth_dict(client_auth):
201202
client2 = FastPurgeClient(auth=client_auth)
202203

203204
assert client1 is not client2
205+
206+
207+
@responses.activate(registry=responses.registries.OrderedRegistry)
208+
def test_retries_on_error(client_auth):
209+
"""Sanity check for the retry functionality"""
210+
url = 'http://fastpurge.example.com:42/ccu/v3/delete/tag/staging'
211+
err_1 = responses.add(responses.POST, url, status=500,
212+
content_type="application/json", body="Error")
213+
err_2 = responses.add(responses.POST, url, status=501,
214+
content_type="application/json", body="Error")
215+
res = responses.add(responses.POST, url, status=201,
216+
content_type="application/json",
217+
json={'estimatedSeconds': 0.1})
218+
219+
client = FastPurgeClient(auth=client_auth, scheme='http', port=42)
220+
221+
future = client.purge_by_tag(['red'], network='staging')
222+
223+
assert future.result()
224+
assert len(err_1.calls) == 1
225+
assert len(err_2.calls) == 1
226+
assert len(res.calls) == 1

0 commit comments

Comments
 (0)