Skip to content

Commit e025e6b

Browse files
fix(client): ensure retried requests are closed (#902)
1 parent 71a13d0 commit e025e6b

File tree

3 files changed

+302
-21
lines changed

3 files changed

+302
-21
lines changed

src/openai/_base_client.py

+80-20
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
DEFAULT_TIMEOUT,
7373
DEFAULT_MAX_RETRIES,
7474
RAW_RESPONSE_HEADER,
75+
STREAMED_RAW_RESPONSE_HEADER,
7576
)
7677
from ._streaming import Stream, AsyncStream
7778
from ._exceptions import (
@@ -363,14 +364,21 @@ def _make_status_error_from_response(
363364
self,
364365
response: httpx.Response,
365366
) -> APIStatusError:
366-
err_text = response.text.strip()
367-
body = err_text
367+
if response.is_closed and not response.is_stream_consumed:
368+
# We can't read the response body as it has been closed
369+
# before it was read. This can happen if an event hook
370+
# raises a status error.
371+
body = None
372+
err_msg = f"Error code: {response.status_code}"
373+
else:
374+
err_text = response.text.strip()
375+
body = err_text
368376

369-
try:
370-
body = json.loads(err_text)
371-
err_msg = f"Error code: {response.status_code} - {body}"
372-
except Exception:
373-
err_msg = err_text or f"Error code: {response.status_code}"
377+
try:
378+
body = json.loads(err_text)
379+
err_msg = f"Error code: {response.status_code} - {body}"
380+
except Exception:
381+
err_msg = err_text or f"Error code: {response.status_code}"
374382

375383
return self._make_status_error(err_msg, body=body, response=response)
376384

@@ -534,6 +542,12 @@ def _process_response_data(
534542
except pydantic.ValidationError as err:
535543
raise APIResponseValidationError(response=response, body=data) from err
536544

545+
def _should_stream_response_body(self, *, request: httpx.Request) -> bool:
546+
if request.headers.get(STREAMED_RAW_RESPONSE_HEADER) == "true":
547+
return True
548+
549+
return False
550+
537551
@property
538552
def qs(self) -> Querystring:
539553
return Querystring()
@@ -606,7 +620,7 @@ def _calculate_retry_timeout(
606620
if response_headers is not None:
607621
retry_header = response_headers.get("retry-after")
608622
try:
609-
retry_after = int(retry_header)
623+
retry_after = float(retry_header)
610624
except Exception:
611625
retry_date_tuple = email.utils.parsedate_tz(retry_header)
612626
if retry_date_tuple is None:
@@ -862,14 +876,21 @@ def _request(
862876
request = self._build_request(options)
863877
self._prepare_request(request)
864878

879+
response = None
880+
865881
try:
866-
response = self._client.send(request, auth=self.custom_auth, stream=stream)
882+
response = self._client.send(
883+
request,
884+
auth=self.custom_auth,
885+
stream=stream or self._should_stream_response_body(request=request),
886+
)
867887
log.debug(
868888
'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
869889
)
870890
response.raise_for_status()
871891
except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
872892
if retries > 0 and self._should_retry(err.response):
893+
err.response.close()
873894
return self._retry_request(
874895
options,
875896
cast_to,
@@ -881,27 +902,39 @@ def _request(
881902

882903
# If the response is streamed then we need to explicitly read the response
883904
# to completion before attempting to access the response text.
884-
err.response.read()
905+
if not err.response.is_closed:
906+
err.response.read()
907+
885908
raise self._make_status_error_from_response(err.response) from None
886909
except httpx.TimeoutException as err:
910+
if response is not None:
911+
response.close()
912+
887913
if retries > 0:
888914
return self._retry_request(
889915
options,
890916
cast_to,
891917
retries,
892918
stream=stream,
893919
stream_cls=stream_cls,
920+
response_headers=response.headers if response is not None else None,
894921
)
922+
895923
raise APITimeoutError(request=request) from err
896924
except Exception as err:
925+
if response is not None:
926+
response.close()
927+
897928
if retries > 0:
898929
return self._retry_request(
899930
options,
900931
cast_to,
901932
retries,
902933
stream=stream,
903934
stream_cls=stream_cls,
935+
response_headers=response.headers if response is not None else None,
904936
)
937+
905938
raise APIConnectionError(request=request) from err
906939

907940
return self._process_response(
@@ -917,7 +950,7 @@ def _retry_request(
917950
options: FinalRequestOptions,
918951
cast_to: Type[ResponseT],
919952
remaining_retries: int,
920-
response_headers: Optional[httpx.Headers] = None,
953+
response_headers: httpx.Headers | None,
921954
*,
922955
stream: bool,
923956
stream_cls: type[_StreamT] | None,
@@ -1303,14 +1336,21 @@ async def _request(
13031336
request = self._build_request(options)
13041337
await self._prepare_request(request)
13051338

1339+
response = None
1340+
13061341
try:
1307-
response = await self._client.send(request, auth=self.custom_auth, stream=stream)
1342+
response = await self._client.send(
1343+
request,
1344+
auth=self.custom_auth,
1345+
stream=stream or self._should_stream_response_body(request=request),
1346+
)
13081347
log.debug(
13091348
'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
13101349
)
13111350
response.raise_for_status()
13121351
except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
13131352
if retries > 0 and self._should_retry(err.response):
1353+
await err.response.aclose()
13141354
return await self._retry_request(
13151355
options,
13161356
cast_to,
@@ -1322,19 +1362,39 @@ async def _request(
13221362

13231363
# If the response is streamed then we need to explicitly read the response
13241364
# to completion before attempting to access the response text.
1325-
await err.response.aread()
1365+
if not err.response.is_closed:
1366+
await err.response.aread()
1367+
13261368
raise self._make_status_error_from_response(err.response) from None
1327-
except httpx.ConnectTimeout as err:
1328-
if retries > 0:
1329-
return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls)
1330-
raise APITimeoutError(request=request) from err
13311369
except httpx.TimeoutException as err:
1370+
if response is not None:
1371+
await response.aclose()
1372+
13321373
if retries > 0:
1333-
return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls)
1374+
return await self._retry_request(
1375+
options,
1376+
cast_to,
1377+
retries,
1378+
stream=stream,
1379+
stream_cls=stream_cls,
1380+
response_headers=response.headers if response is not None else None,
1381+
)
1382+
13341383
raise APITimeoutError(request=request) from err
13351384
except Exception as err:
1385+
if response is not None:
1386+
await response.aclose()
1387+
13361388
if retries > 0:
1337-
return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls)
1389+
return await self._retry_request(
1390+
options,
1391+
cast_to,
1392+
retries,
1393+
stream=stream,
1394+
stream_cls=stream_cls,
1395+
response_headers=response.headers if response is not None else None,
1396+
)
1397+
13381398
raise APIConnectionError(request=request) from err
13391399

13401400
return self._process_response(
@@ -1350,7 +1410,7 @@ async def _retry_request(
13501410
options: FinalRequestOptions,
13511411
cast_to: Type[ResponseT],
13521412
remaining_retries: int,
1353-
response_headers: Optional[httpx.Headers] = None,
1413+
response_headers: httpx.Headers | None,
13541414
*,
13551415
stream: bool,
13561416
stream_cls: type[_AsyncStreamT] | None,

src/openai/_constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import httpx
44

55
RAW_RESPONSE_HEADER = "X-Stainless-Raw-Response"
6+
STREAMED_RAW_RESPONSE_HEADER = "X-Stainless-Streamed-Raw-Response"
67

78
# default timeout is 10 minutes
89
DEFAULT_TIMEOUT = httpx.Timeout(timeout=600.0, connect=5.0)

0 commit comments

Comments
 (0)