Skip to content

Commit 3baaee2

Browse files
authored
Python: streaming agent response callback in agent orchestrations (#12360)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Issue: #12306 Discussion: #12302 Currently, we only have a **non-streaming** agent response callback for users to interact with the agents inside an orchestration. Support for a stream one has been requested by developers. ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> Add support for a streaming agent response callback to all orchestrations. ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone 😄
1 parent c822511 commit 3baaee2

File tree

14 files changed

+771
-158
lines changed

14 files changed

+771
-158
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
3+
import asyncio
4+
5+
from semantic_kernel.agents import Agent, ChatCompletionAgent, SequentialOrchestration
6+
from semantic_kernel.agents.runtime import InProcessRuntime
7+
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
8+
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
9+
10+
"""
11+
The following sample demonstrates how to create a sequential orchestration for
12+
executing multiple agents in sequence, i.e. the output of one agent is the input
13+
to the next agent.
14+
15+
This sample demonstrates the basic steps of creating and starting a runtime, creating
16+
a sequential orchestration, invoking the orchestration, and finally waiting for the
17+
results.
18+
"""
19+
20+
21+
def get_agents() -> list[Agent]:
22+
"""Return a list of agents that will participate in the sequential orchestration.
23+
24+
Feel free to add or remove agents.
25+
"""
26+
concept_extractor_agent = ChatCompletionAgent(
27+
name="ConceptExtractorAgent",
28+
instructions=(
29+
"You are a marketing analyst. Given a product description, identify:\n"
30+
"- Key features\n"
31+
"- Target audience\n"
32+
"- Unique selling points\n\n"
33+
),
34+
service=AzureChatCompletion(),
35+
)
36+
writer_agent = ChatCompletionAgent(
37+
name="WriterAgent",
38+
instructions=(
39+
"You are a marketing copywriter. Given a block of text describing features, audience, and USPs, "
40+
"compose a compelling marketing copy (like a newsletter section) that highlights these points. "
41+
"Output should be short (around 150 words), output just the copy as a single text block."
42+
),
43+
service=AzureChatCompletion(),
44+
)
45+
format_proof_agent = ChatCompletionAgent(
46+
name="FormatProofAgent",
47+
instructions=(
48+
"You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone, "
49+
"give format and make it polished. Output the final improved copy as a single text block."
50+
),
51+
service=AzureChatCompletion(),
52+
)
53+
54+
# The order of the agents in the list will be the order in which they are executed
55+
return [concept_extractor_agent, writer_agent, format_proof_agent]
56+
57+
58+
# Flag to indicate if a new message is being received
59+
is_new_message = True
60+
61+
62+
def streaming_agent_response_callback(message: StreamingChatMessageContent, is_final: bool) -> None:
63+
"""Observer function to print the messages from the agents.
64+
65+
Args:
66+
message (StreamingChatMessageContent): The streaming message content from the agent.
67+
is_final (bool): Indicates if this is the final part of the message.
68+
"""
69+
global is_new_message
70+
if is_new_message:
71+
print(f"# {message.name}")
72+
is_new_message = False
73+
print(message.content, end="", flush=True)
74+
if is_final:
75+
print()
76+
is_new_message = True
77+
78+
79+
async def main():
80+
"""Main function to run the agents."""
81+
# 1. Create a sequential orchestration with multiple agents and an agent
82+
# response callback to observe the output from each agent as they stream
83+
# their responses.
84+
agents = get_agents()
85+
sequential_orchestration = SequentialOrchestration(
86+
members=agents,
87+
streaming_agent_response_callback=streaming_agent_response_callback,
88+
)
89+
90+
# 2. Create a runtime and start it
91+
runtime = InProcessRuntime()
92+
runtime.start()
93+
94+
# 3. Invoke the orchestration with a task and the runtime
95+
orchestration_result = await sequential_orchestration.invoke(
96+
task="An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours",
97+
runtime=runtime,
98+
)
99+
100+
# 4. Wait for the results
101+
value = await orchestration_result.get(timeout=20)
102+
print(f"***** Final Result *****\n{value}")
103+
104+
# 5. Stop the runtime when idle
105+
await runtime.stop_when_idle()
106+
107+
"""
108+
Sample output:
109+
# ConceptExtractorAgent
110+
**Key Features:**
111+
- Made from eco-friendly stainless steel
112+
- Insulation technology that keeps drinks cold for up to 24 hours
113+
- Reusable design, promoting sustainability
114+
- Possible variations in sizes and colors
115+
116+
**Target Audience:**
117+
- Environmentally conscious consumers
118+
- Active individuals and outdoor enthusiasts
119+
- Health-conscious individuals looking to stay hydrated
120+
- Students and professionals looking for stylish and functional drinkware
121+
122+
**Unique Selling Points:**
123+
- Combines eco-friendliness with high performance in temperature retention
124+
- Durable and reusable, reducing reliance on single-use plastics
125+
- Sleek design that appeals to modern aesthetics while being functional
126+
- Supporting sustainability initiatives through responsible manufacturing practices
127+
# WriterAgent
128+
Sip sustainably with our eco-friendly stainless steel water bottles, designed for the conscious consumer who values
129+
both performance and aesthetics. Our bottles feature advanced insulation technology that keeps your drinks icy cold
130+
for up to 24 hours, making them perfect for outdoor adventures, gym sessions, or a busy day at the office. Choose
131+
from various sizes and stunning colors to match your personal style while making a positive impact on the planet.
132+
Each reusable bottle helps reduce single-use plastics, supporting a cleaner, greener world. Join the movement toward
133+
sustainability without compromising on style or functionality. Stay hydrated, look great, and make a difference—get
134+
your eco-friendly water bottle today!
135+
# FormatProofAgent
136+
Sip sustainably with our eco-friendly stainless steel water bottles, designed for the conscious consumer who values
137+
both performance and aesthetics. Our bottles utilize advanced insulation technology to keep your beverages icy cold
138+
for up to 24 hours, making them perfect for outdoor adventures, gym sessions, or a busy day at the office.
139+
140+
Choose from a variety of sizes and stunning colors to match your personal style while positively impacting the
141+
planet. Each reusable bottle helps reduce single-use plastics, supporting a cleaner, greener world.
142+
143+
Join the movement towards sustainability without compromising on style or functionality. Stay hydrated, look great,
144+
and make a difference—get your eco-friendly water bottle today!
145+
***** Final Result *****
146+
Sip sustainably with our eco-friendly stainless steel water bottles, designed for the conscious consumer who values
147+
both performance and aesthetics. Our bottles utilize advanced insulation technology to keep your beverages icy cold
148+
for up to 24 hours, making them perfect for outdoor adventures, gym sessions, or a busy day at the office.
149+
150+
Choose from a variety of sizes and stunning colors to match your personal style while positively impacting the
151+
planet. Each reusable bottle helps reduce single-use plastics, supporting a cleaner, greener world.
152+
153+
Join the movement towards sustainability without compromising on style or functionality. Stay hydrated, look great,
154+
and make a difference—get your eco-friendly water bottle today!
155+
"""
156+
157+
158+
if __name__ == "__main__":
159+
asyncio.run(main())

python/semantic_kernel/agents/orchestration/agent_actor_base.py

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from semantic_kernel.agents.orchestration.orchestration_base import DefaultTypeAlias
1111
from semantic_kernel.agents.runtime.core.message_context import MessageContext
1212
from semantic_kernel.agents.runtime.core.routed_agent import RoutedAgent
13-
from semantic_kernel.contents.chat_history import ChatHistory
13+
from semantic_kernel.contents import ChatHistory, ChatMessageContent, StreamingChatMessageContent
1414
from semantic_kernel.utils.feature_stage_decorator import experimental
1515

1616
if sys.version_info >= (3, 12):
@@ -44,18 +44,23 @@ def __init__(
4444
agent: Agent,
4545
internal_topic_type: str,
4646
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
47+
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
48+
| None = None,
4749
) -> None:
4850
"""Initialize the agent container.
4951
5052
Args:
5153
agent (Agent): An agent to be run in the container.
5254
internal_topic_type (str): The topic type of the internal topic.
53-
agent_response_callback (Callable | None): A function that is called when a response is produced
55+
agent_response_callback (Callable | None): A function that is called when a full response is produced
5456
by the agents.
57+
streaming_agent_response_callback (Callable | None): A function that is called when a streaming response
58+
is produced by the agents.
5559
"""
5660
self._agent = agent
5761
self._internal_topic_type = internal_topic_type
5862
self._agent_response_callback = agent_response_callback
63+
self._streaming_agent_response_callback = streaming_agent_response_callback
5964

6065
self._agent_thread: AgentThread | None = None
6166
# Chat history to temporarily store messages before the agent thread is created
@@ -69,9 +74,78 @@ async def _call_agent_response_callback(self, message: DefaultTypeAlias) -> None
6974
Args:
7075
message (DefaultTypeAlias): The message to be sent to the agent_response_callback.
7176
"""
72-
# TODO(@taochen): Support streaming
7377
if self._agent_response_callback:
7478
if inspect.iscoroutinefunction(self._agent_response_callback):
7579
await self._agent_response_callback(message)
7680
else:
7781
self._agent_response_callback(message)
82+
83+
async def _call_streaming_agent_response_callback(
84+
self,
85+
message_chunk: StreamingChatMessageContent,
86+
is_final: bool,
87+
) -> None:
88+
"""Call the streaming_agent_response_callback function if it is set.
89+
90+
Args:
91+
message_chunk (StreamingChatMessageContent): The message chunk.
92+
is_final (bool): Whether this is the final chunk of the response.
93+
"""
94+
if self._streaming_agent_response_callback:
95+
if inspect.iscoroutinefunction(self._streaming_agent_response_callback):
96+
await self._streaming_agent_response_callback(message_chunk, is_final)
97+
else:
98+
self._streaming_agent_response_callback(message_chunk, is_final)
99+
100+
async def _invoke_agent(self, additional_messages: DefaultTypeAlias | None = None, **kwargs) -> ChatMessageContent:
101+
"""Invoke the agent with the current chat history or thread and optionally additional messages.
102+
103+
Args:
104+
additional_messages (DefaultTypeAlias | None): Additional messages to be sent to the agent.
105+
**kwargs: Additional keyword arguments to be passed to the agent's invoke method:
106+
- kernel: The kernel to use for the agent invocation.
107+
108+
Returns:
109+
DefaultTypeAlias: The response from the agent.
110+
"""
111+
streaming_message_buffer: list[StreamingChatMessageContent] = []
112+
messages = self._create_messages(additional_messages)
113+
114+
async for response_item in self._agent.invoke_stream(messages=messages, thread=self._agent_thread, **kwargs): # type: ignore[arg-type]
115+
# Buffer message chunks and stream them with correct is_final flag.
116+
streaming_message_buffer.append(response_item.message)
117+
if len(streaming_message_buffer) > 1:
118+
await self._call_streaming_agent_response_callback(streaming_message_buffer[-2], is_final=False)
119+
if self._agent_thread is None:
120+
self._agent_thread = response_item.thread
121+
122+
if streaming_message_buffer:
123+
# Call the callback for the last message chunk with is_final=True.
124+
await self._call_streaming_agent_response_callback(streaming_message_buffer[-1], is_final=True)
125+
126+
if not streaming_message_buffer:
127+
raise RuntimeError(f'Agent "{self._agent.name}" did not return any response.')
128+
129+
# Build the full response from the streaming messages
130+
full_response = sum(streaming_message_buffer[1:], streaming_message_buffer[0])
131+
await self._call_agent_response_callback(full_response)
132+
133+
return full_response
134+
135+
def _create_messages(self, additional_messages: DefaultTypeAlias | None = None) -> list[ChatMessageContent]:
136+
"""Create a list of messages to be sent to the agent along with a potential thread.
137+
138+
Args:
139+
additional_messages (DefaultTypeAlias | None): Additional messages to be sent to the agent.
140+
141+
Returns:
142+
list[ChatMessageContent]: A list of messages to be sent to the agent.
143+
"""
144+
base_messages = self._chat_history.messages[:] if self._agent_thread is None else []
145+
146+
if additional_messages is None:
147+
return base_messages
148+
149+
if isinstance(additional_messages, list):
150+
return base_messages + additional_messages
151+
return [*base_messages, additional_messages]

python/semantic_kernel/agents/orchestration/concurrent.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from semantic_kernel.agents.runtime.core.routed_agent import message_handler
2121
from semantic_kernel.agents.runtime.core.topic import TopicId
2222
from semantic_kernel.agents.runtime.in_process.type_subscription import TypeSubscription
23+
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
2324
from semantic_kernel.kernel_pydantic import KernelBaseModel
2425
from semantic_kernel.utils.feature_stage_decorator import experimental
2526

@@ -56,38 +57,38 @@ def __init__(
5657
internal_topic_type: str,
5758
collection_agent_type: str,
5859
agent_response_callback: Callable[[DefaultTypeAlias], Awaitable[None] | None] | None = None,
60+
streaming_agent_response_callback: Callable[[StreamingChatMessageContent, bool], Awaitable[None] | None]
61+
| None = None,
5962
) -> None:
6063
"""Initialize the agent actor.
6164
6265
Args:
6366
agent: The agent to be executed.
6467
internal_topic_type: The internal topic type for the actor.
6568
collection_agent_type: The collection agent type for the actor.
66-
agent_response_callback: A callback function to handle the response from the agent.
69+
agent_response_callback: A callback function to handle the full response from the agent.
70+
streaming_agent_response_callback: A callback function to handle streaming responses from the agent.
6771
"""
6872
self._collection_agent_type = collection_agent_type
6973
super().__init__(
7074
agent=agent,
7175
internal_topic_type=internal_topic_type,
7276
agent_response_callback=agent_response_callback,
77+
streaming_agent_response_callback=streaming_agent_response_callback,
7378
)
7479

7580
@message_handler
7681
async def _handle_message(self, message: ConcurrentRequestMessage, ctx: MessageContext) -> None:
7782
"""Handle a message."""
7883
logger.debug(f"Concurrent actor (Actor ID: {self.id}; Agent name: {self._agent.name}) started processing...")
7984

80-
response = await self._agent.get_response(
81-
messages=message.body, # type: ignore[arg-type]
82-
)
85+
response = await self._invoke_agent(additional_messages=message.body)
8386

8487
logger.debug(f"Concurrent actor (Actor ID: {self.id}; Agent name: {self._agent.name}) finished processing.")
8588

86-
await self._call_agent_response_callback(response.message)
87-
8889
target_actor_id = await self.runtime.get(self._collection_agent_type)
8990
await self.send_message(
90-
ConcurrentResponseMessage(body=response.message),
91+
ConcurrentResponseMessage(body=response),
9192
target_actor_id,
9293
cancellation_token=ctx.cancellation_token,
9394
)
@@ -181,6 +182,7 @@ async def _internal_helper(agent: Agent) -> None:
181182
internal_topic_type,
182183
collection_agent_type=self._get_collection_actor_type(internal_topic_type),
183184
agent_response_callback=self._agent_response_callback,
185+
streaming_agent_response_callback=self._streaming_agent_response_callback,
184186
),
185187
)
186188

0 commit comments

Comments
 (0)