From ecdcc010570d4d4dc0cfaac3a498bc3b2cdd5559 Mon Sep 17 00:00:00 2001 From: Stainless Bot Date: Mon, 13 Jan 2025 16:19:56 +0000 Subject: [PATCH] chore(internal): streaming refactors --- src/openai/_streaming.py | 66 +++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 3a5c9571a1..7aa7b62f6b 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -59,23 +59,22 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break - if sse.event is None: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - - yield process_data(data=data, cast_to=cast_to, response=response) + data = sse.json() + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data=data, cast_to=cast_to, response=response) # Ensure the entire stream is consumed for _sse in iterator: @@ -142,23 +141,22 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break - if sse.event is None: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - - yield process_data(data=data, cast_to=cast_to, response=response) + data = sse.json() + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data=data, cast_to=cast_to, response=response) # Ensure the entire stream is consumed async for _sse in iterator: