Skip to content

Commit 381cf48

Browse files
stainless-botRobertCraigie
authored andcommitted
feat(api): add messages endpoint with streaming helpers (anthropics#286)
More information here: https://docs.anthropic.com/claude/reference/messages_post
1 parent bd84369 commit 381cf48

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3291
-198
lines changed

.stats.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
configured_endpoints: 1
1+
configured_endpoints: 2

README.md

+38
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,44 @@ async for completion in stream:
170170
print(completion.completion, end="", flush=True)
171171
```
172172

173+
### Streaming Helpers
174+
175+
This library provides several conveniences for streaming messages, for example:
176+
177+
```py
178+
import asyncio
179+
from anthropic import AsyncAnthropic
180+
181+
client = AsyncAnthropic()
182+
183+
async def main() -> None:
184+
async with client.beta.messages.stream(
185+
max_tokens=1024,
186+
messages=[
187+
{
188+
"role": "user",
189+
"content": "Say hello there!",
190+
}
191+
],
192+
model="claude-2.1",
193+
) as stream:
194+
async for text in stream.text_stream:
195+
print(text, end="", flush=True)
196+
print()
197+
198+
# you can still get the accumulated final message outside of
199+
# the context manager, as long as the entire stream was consumed
200+
# inside of the context manager
201+
accumulated = await stream.get_final_message()
202+
print(accumulated.model_dump_json(indent=2))
203+
204+
asyncio.run(main())
205+
```
206+
207+
Streaming with `client.beta.messages.stream(...)` exposes [various helpers for your convenience](helpers.md) including event handlers and accumulation.
208+
209+
Alternatively, you can use `client.beta.messages.create(..., stream=True)` which only returns an async iterable of the events in the stream and thus uses less memory (it does not build up a final message object for you).
210+
173211
## Token counting
174212

175213
You can estimate billing for a given request with the `client.count_tokens()` method, eg:

api.md

+28
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,31 @@ from anthropic.types import Completion
1515
Methods:
1616

1717
- <code title="post /v1/complete">client.completions.<a href="./src/anthropic/resources/completions.py">create</a>(\*\*<a href="src/anthropic/types/completion_create_params.py">params</a>) -> <a href="./src/anthropic/types/completion.py">Completion</a></code>
18+
19+
# Beta
20+
21+
## Messages
22+
23+
Types:
24+
25+
```python
26+
from anthropic.types.beta import (
27+
ContentBlock,
28+
ContentBlockDeltaEvent,
29+
ContentBlockStartEvent,
30+
ContentBlockStopEvent,
31+
Message,
32+
MessageDeltaEvent,
33+
MessageParam,
34+
MessageStartEvent,
35+
MessageStopEvent,
36+
MessageStreamEvent,
37+
TextBlock,
38+
TextDelta,
39+
)
40+
```
41+
42+
Methods:
43+
44+
- <code title="post /v1/messages">client.beta.messages.<a href="./src/anthropic/resources/beta/messages.py">create</a>(\*\*<a href="src/anthropic/types/beta/message_create_params.py">params</a>) -> <a href="./src/anthropic/types/beta/message.py">Message</a></code>
45+
- <code>client.beta.messages.<a href="./src/anthropic/resources/beta/messages.py">stream</a>(\*args) -> MessageStreamManager[MessageStream] | MessageStreamManager[MessageStreamT]</code>

examples/messages_stream.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
3+
from anthropic import AsyncAnthropic
4+
5+
client = AsyncAnthropic()
6+
7+
8+
async def main() -> None:
9+
async with client.beta.messages.stream(
10+
max_tokens=1024,
11+
messages=[
12+
{
13+
"role": "user",
14+
"content": "Say hello there!",
15+
}
16+
],
17+
model="claude-2.1",
18+
) as stream:
19+
async for text in stream.text_stream:
20+
print(text, end="", flush=True)
21+
print()
22+
23+
# you can still get the accumulated final message outside of
24+
# the context manager, as long as the entire stream was consumed
25+
# inside of the context manager
26+
accumulated = await stream.get_final_message()
27+
print("accumulated message: ", accumulated.model_dump_json(indent=2))
28+
29+
30+
asyncio.run(main())

examples/messages_stream_handler.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
from typing_extensions import override
3+
4+
from anthropic import AsyncAnthropic
5+
from anthropic.types.beta import MessageStreamEvent
6+
from anthropic.lib.streaming import AsyncMessageStream
7+
8+
client = AsyncAnthropic()
9+
10+
11+
class MyStream(AsyncMessageStream):
12+
@override
13+
async def on_stream_event(self, event: MessageStreamEvent) -> None:
14+
print("on_event fired with:", event)
15+
16+
17+
async def main() -> None:
18+
async with client.beta.messages.stream(
19+
max_tokens=1024,
20+
messages=[
21+
{
22+
"role": "user",
23+
"content": "Say hello there!",
24+
}
25+
],
26+
model="claude-2.1",
27+
event_handler=MyStream,
28+
) as stream:
29+
accumulated = await stream.get_final_message()
30+
print("accumulated message: ", accumulated.model_dump_json(indent=2))
31+
32+
33+
asyncio.run(main())

helpers.md

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Message Helpers
2+
3+
## Streaming Responses
4+
5+
```python
6+
async with client.beta.messages.stream(
7+
max_tokens=1024,
8+
messages=[
9+
{
10+
"role": "user",
11+
"content": "Say hello there!",
12+
}
13+
],
14+
model="claude-2.1",
15+
) as stream:
16+
async for text in stream.text_stream:
17+
print(text, end="", flush=True)
18+
print()
19+
```
20+
21+
`client.beta.messages.stream()` returns a `MessageStreamManager`, which is a context manager that yields a `MessageStream` which is iterable, emits events and accumulates messages.
22+
23+
Alternatively, you can use `client.beta.messages.create(..., stream=True)` which returns an
24+
iteratable of the events in the stream and uses less memory (most notably, it does not accumulate a final message
25+
object for you).
26+
27+
The stream will be cancelled when the context manager exits but you can also close it prematurely by calling `stream.close()`.
28+
29+
See an example of streaming helpers in action in [`examples/messages_stream.py`](examples/messages_stream.py) and defining custom event handlers in [`examples/messages_stream_handler.py`](examples/messages_stream_handler.py)
30+
31+
> [!NOTE]
32+
> The synchronous client has the same interface just without `async/await`.
33+
34+
### Lenses
35+
36+
#### `.text_stream`
37+
38+
Provides an iterator over just the text deltas in the stream:
39+
40+
```py
41+
async for text in stream.text_stream:
42+
print(text, end="", flush=True)
43+
print()
44+
```
45+
46+
### Events
47+
48+
#### `await on_stream_event(event: MessageStreamEvent)`
49+
50+
The event is fired when an event is received from the API.
51+
52+
#### `await on_message(message: Message)`
53+
54+
The event is fired when a full Message object has been accumulated. This corresponds to the `message_stop` SSE.
55+
56+
#### `await on_content_block(content_block: ContentBlock)`
57+
58+
The event is fired when a full ContentBlock object has been accumulated. This corresponds to the `content_block_stop` SSE.
59+
60+
#### `await on_text(text: str, snapshot: str)`
61+
62+
The event is fired when a `text` ContentBlock object is being accumulated. The first argument is the text delta and the second is the current accumulated text, for example:
63+
64+
```py
65+
on_text('Hello', 'Hello')
66+
on_text(' there', 'Hello there')
67+
on_text('!', 'Hello there!')
68+
```
69+
70+
This corresponds to the `content_block_delta` SSE.
71+
72+
#### `await on_exception(exception: Exception)`
73+
74+
The event is fired when an exception is encountered while streaming the response.
75+
76+
#### `await on_timeout()`
77+
78+
The event is fired when the request times out.
79+
80+
#### `await on_end()`
81+
82+
The last event fired in the stream.
83+
84+
### Methods
85+
86+
#### `await .close()`
87+
88+
Aborts the request.
89+
90+
#### `await .until_done()`
91+
92+
Blocks until the stream has been read to completion.
93+
94+
#### `await .get_final_message()`
95+
96+
Blocks until the stream has been read to completion and returns the accumulated `Message` object.
97+
98+
#### `await .get_final_text()`
99+
100+
> [!NOTE]
101+
> Currently the API will only ever return 1 content block
102+
103+
Blocks until the stream has been read to completion and returns all `text` content blocks concatenated together.

src/anthropic/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
"AI_PROMPT",
7070
]
7171

72+
from .lib.streaming import *
73+
7274
_setup_logging()
7375

7476
# Update the __module__ attribute for exported symbols so that

src/anthropic/_client.py

+6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
class Anthropic(SyncAPIClient):
5454
completions: resources.Completions
55+
beta: resources.Beta
5556
with_raw_response: AnthropicWithRawResponse
5657

5758
# client options
@@ -126,6 +127,7 @@ def __init__(
126127
self._default_stream_cls = Stream
127128

128129
self.completions = resources.Completions(self)
130+
self.beta = resources.Beta(self)
129131
self.with_raw_response = AnthropicWithRawResponse(self)
130132

131133
@property
@@ -303,6 +305,7 @@ def _make_status_error(
303305

304306
class AsyncAnthropic(AsyncAPIClient):
305307
completions: resources.AsyncCompletions
308+
beta: resources.AsyncBeta
306309
with_raw_response: AsyncAnthropicWithRawResponse
307310

308311
# client options
@@ -377,6 +380,7 @@ def __init__(
377380
self._default_stream_cls = AsyncStream
378381

379382
self.completions = resources.AsyncCompletions(self)
383+
self.beta = resources.AsyncBeta(self)
380384
self.with_raw_response = AsyncAnthropicWithRawResponse(self)
381385

382386
@property
@@ -555,11 +559,13 @@ def _make_status_error(
555559
class AnthropicWithRawResponse:
556560
def __init__(self, client: Anthropic) -> None:
557561
self.completions = resources.CompletionsWithRawResponse(client.completions)
562+
self.beta = resources.BetaWithRawResponse(client.beta)
558563

559564

560565
class AsyncAnthropicWithRawResponse:
561566
def __init__(self, client: AsyncAnthropic) -> None:
562567
self.completions = resources.AsyncCompletionsWithRawResponse(client.completions)
568+
self.beta = resources.AsyncBetaWithRawResponse(client.beta)
563569

564570

565571
Client = Anthropic

src/anthropic/_response.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
import datetime
66
import functools
77
from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, cast
8-
from typing_extensions import Awaitable, ParamSpec, get_args, override, get_origin
8+
from typing_extensions import Awaitable, ParamSpec, override, get_origin
99

1010
import httpx
1111

1212
from ._types import NoneType, UnknownResponse, BinaryResponseContent
13-
from ._utils import is_given
13+
from ._utils import is_given, extract_type_var_from_base
1414
from ._models import BaseModel, is_basemodel
1515
from ._constants import RAW_RESPONSE_HEADER
1616
from ._exceptions import APIResponseValidationError
@@ -221,12 +221,13 @@ def __init__(self) -> None:
221221

222222

223223
def _extract_stream_chunk_type(stream_cls: type) -> type:
224-
args = get_args(stream_cls)
225-
if not args:
226-
raise TypeError(
227-
f"Expected stream_cls to have been given a generic type argument, e.g. Stream[Foo] but received {stream_cls}",
228-
)
229-
return cast(type, args[0])
224+
from ._base_client import Stream, AsyncStream
225+
226+
return extract_type_var_from_base(
227+
stream_cls,
228+
index=0,
229+
generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)),
230+
)
230231

231232

232233
def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, APIResponse[R]]:

0 commit comments

Comments
 (0)