Skip to content

How to service agents over HTTP #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rahilvora opened this issue Mar 13, 2025 · 7 comments
Open

How to service agents over HTTP #126

rahilvora opened this issue Mar 13, 2025 · 7 comments
Labels
question Question about using the SDK

Comments

@rahilvora
Copy link

Please read this first

  • Have you read the docs?Agents SDK docs
  • Have you searched for related issues? Others may have had similar requesrs

Question

I am looking at this documentation and it helps me understand how I can create multiple agents and add tools to it.

But, I am curious to know how can I create an application server that could serve my Agents created using SDK over HTTP. So that different frontend apps can communicate with my agents.

Can someone to appropriate resources/examples that can help me understand serving part of the Agent?

@rahilvora rahilvora added the question Question about using the SDK label Mar 13, 2025
@rm-openai
Copy link
Collaborator

Here's an example I found on the web that uses FastAPI + the OpenAI SDK: https://blog.pamelafox.org/2024/01/using-fastapi-for-openai-chat-backend.html

You can sub out the parts that use the OpenAI SDK, for code that uses the Agents SDK.

We can also work on a sample like this in this repo!

@rahilvora
Copy link
Author

rahilvora commented Mar 13, 2025

@rm-openai I can help you with the Sample. I can create one in this repo as reference and you review it? Should I create new issue and start working on it?

@masterkain
Copy link

masterkain commented Mar 13, 2025

very basic ex, I suggest reading the guide cause you then have handoffs, streaming, tool usage, etc.

# uv run python -m uvicorn main:app --reload

import os

os.environ["OPENAI_API_KEY"] = "sk-xxxx"

import openai
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel

app = FastAPI()


# ---------------------------
# Example 1: Using the New Responses API
# ---------------------------
@app.get("/responses")
async def responses_endpoint():
    # This example uses the Responses API with the built-in web search tool.
    response = openai.responses.create(model="gpt-4o-mini", input="What is a positive news story from today?", tools=[{"type": "web_search_preview"}])  # Model supporting the Responses API
    return {"result": response.output_text}


# ---------------------------
# Define a Pydantic model for the outline checker's output.
# ---------------------------
class OutlineCheckerOutput(BaseModel):
    good_quality: bool
    is_scifi: bool


# ---------------------------
# Example 2: Using the New Agents SDK for a Multi-Agent Workflow
# ---------------------------
from agents import Agent, Runner, trace

# Agent to generate a story outline.
story_outline_agent = Agent(name="story_outline_agent", instructions="Generate a short sci-fi story outline based on the user's prompt.")

# Agent to check the outline's quality, now using a proper Pydantic model.
outline_checker_agent = Agent(
    name="outline_checker_agent",
    instructions=("Check if the story outline is of good quality and if it is a sci-fi story. " "Return a JSON object with keys 'good_quality' (boolean) and 'is_scifi' (boolean)."),
    output_type=OutlineCheckerOutput,
)

# Agent to write the final story based on the approved outline.
story_writer_agent = Agent(name="story_writer_agent", instructions="Write a complete short story based on the given outline.")


@app.get("/agents")
async def agents_endpoint(prompt: str):
    with trace("Multi-agent workflow"):
        # Step 1: Generate the outline.
        outline_result = await Runner.run(story_outline_agent, prompt)
        # Step 2: Check the outline quality.
        checker_result = await Runner.run(outline_checker_agent, outline_result.final_output)
        checker_output = checker_result.final_output
        if not (checker_output.good_quality and checker_output.is_scifi):
            return {"result": "Outline did not pass quality check. Workflow stopped."}
        # Step 3: Write the final story.
        story_result = await Runner.run(story_writer_agent, outline_result.final_output)
        return {"result": story_result.final_output}


# ---------------------------
# Home Page: A simple HTML interface
# ---------------------------
@app.get("/")
async def home():
    html_content = """
    <html>
        <head>
            <title>OpenAI Responses & Agents Demo</title>
        </head>
        <body>
            <h1>OpenAI Responses & Agents Demo</h1>
            <ul>
                <li><a href="/responses">Test Responses API</a></li>
                <li>
                    <form action="/agents" method="get">
                        <input name="prompt" type="text" placeholder="Enter story prompt" />
                        <button type="submit">Run Agents Workflow</button>
                    </form>
                </li>
            </ul>
        </body>
    </html>
    """
    return HTMLResponse(content=html_content)

@rahilvora
Copy link
Author

@rm-openai Created PR for this #168

@isaac47
Copy link

isaac47 commented Mar 18, 2025

Hi,

I'm trying to stream the output of an agent similar to how OpenAI does it to serve fastAPI endpoint, but the following code doesn't work as expected:

import json

# agent is created here

result = Runner.run_streamed(agent, "tell me a joke.")

async for event in result.stream_events():
    json.dumps(event.model_dump(), ensure_ascii=False) + "\n"

I want to receive and process each streamed event progressively, but I'm not getting any output. How can I properly implement streaming in this case?

Any guidance would be appreciated. Thanks!

@TimoVink
Copy link

TimoVink commented Mar 18, 2025

There are different ways to return an event stream. Personally I use SSE in my experiments. Looks something like:

from fastapi.responses import StreamingResponse
from agents import Runner, Agent
import json

def _sse_response(generator):
    async def _wrapper():
        async for event in generator:
            yield f'data: {json.dumps(event)}\n\n'
    result = StreamingResponse(
        _wrapper(),
        media_type='text/event-stream',
        headers={ 'Content-Encoding': 'none' }
    )
    return result

async def _agent_stream(agent, prompt):
    result = Runner.run_streamed(agent, input=prompt, max_turns=32)
    async for event in result.stream_events():
        # Handle whatever events you're interested in here, `yield` JSON-serializable objects
        if event.type == 'raw_response_event':
            if event.data.type == 'response.output_text.delta':
                yield {
                    'type': 'output_delta',
                    'delta': event.data.delta,
                }

def _agent_response(agent, prompt):
    return _sse_response(_agent_stream(agent, prompt))

@app.get('/api/haiku')
def get_haiku(prompt: str):
    agent = Agent(name="Haiku", instructions="Write a Haiku about the given topic.")
    return agent_response(agent, prompt)

Note that the events emitted by Runner.run_streamed are not JSON serializable. The events and their properties appear to be a mix of dataclasses and Pydantic models.

In the example above I pick out the events I care about (just response.output_text.delta) and emit my own dicts. Alternatively you could write a custom JSONEncoder or other mechanism to serialize the events.

The output for this endpoint looks something like:
Image

@vincenzodomina
Copy link
Contributor

I fiddled the last days to get steaming and tool calling working like described above with a custom frontend and custom REST endpoint, but the streaming events are not straightforward (...to me, yet, but to be fair much more capable, great work!)

The matching of the Responses API stream events with the handling of these on the frontend side was quite some work and seems to need some proper standardized, reliable way a la Vercel AI SDK.

In my backend, I tried to just re-stream the event.data as is over HTTP as SSE, but had to make some custom mapping depending on the event types, as like mentioned above they have different types and are not all json serializable as is.
My frontend is using assistant-ui, more info to that implementation here: assistant-ui/assistant-ui#1748

So far it works for me with multiple tools, but it feels little robust and hacky in a way that i need to use different levels of event types (raw responses or tool_output, the higher level agent event) to get tool calling data and have to use workarounds to associate tool call arguments with their outputs, because the OpenAI Agent SDK events are not providing all the necessary data in every event (e.g. tool_output does not have the call_id in it, how to know from which tool it is? I can work around like below, but imagine parallel tool calls)

Please share if you can make it better, or know who is taking a stab at making this more standardized.

e.g. i saw Vercel already added the OpenAI responses API to their AI SDK, but this would be good also for the Agent SDK for the use of in a custom backend with custom REST endpoint!

Backend:

            result = Runner.run_streamed(agent, input=messages)

            async for event in result.stream_events():
                if event.type == "raw_response_event":
                    if isinstance(event.data, ResponseCompletedEvent):
                        yield {
                            "type": event.data.type,
                            "data": event.data.response.model_dump(),
                        }
                    elif isinstance(
                        event.data, ResponseOutputItemAddedEvent
                    ) or isinstance(event.data, ResponseOutputItemDoneEvent):
                        yield {
                            "type": event.data.type + "." + event.data.item.type,
                            "data": event.data.item.model_dump(),
                        }
                    elif isinstance(event.data, ResponseTextAnnotationDeltaEvent):
                        yield {"type": event.data.type, "data": event.data.model_dump()}
                    elif isinstance(event.data, ResponseTextDeltaEvent):
                        yield {"type": event.data.type, "data": event.data.model_dump()}
                    elif isinstance(event.data, ResponseTextDoneEvent):
                        yield {"type": event.data.type, "data": event.data.model_dump()}
                    elif isinstance(
                        event.data, ResponseFunctionCallArgumentsDeltaEvent
                    ):
                        yield {"type": event.data.type, "data": event.data.model_dump()}
                    elif isinstance(event.data, ResponseFunctionCallArgumentsDoneEvent):
                        yield {"type": event.data.type, "data": event.data.model_dump()}
                        
                    ...
                    
                elif event.type == "run_item_stream_event":
                    if event.name == "tool_called":
                        yield {
                            "type": event.name + raw_item_type_addon,
                            "data": event.item.raw_item.model_dump(),
                        }
                    elif event.name == "tool_output":
                        yield {
                            "type": event.name + raw_item_type_addon,
                            "data": {
                                **raw_item,
                                "output": event.item.output,
                            },
                        }      
                        
                   ...

Frontend:


"use client";

import { useState } from "react";
import {
  CompositeAttachmentAdapter,
  SimpleImageAttachmentAdapter,
  SimpleTextAttachmentAdapter,
} from "@assistant-ui/react";

import { ThreadMessageLike } from "@assistant-ui/react";
import { AppendMessage } from "@assistant-ui/react";
import {
  AssistantRuntimeProvider,
  useExternalStoreRuntime,
} from "@assistant-ui/react";
import { config } from "@/config";
import { useMessages, ThreadMessageRecord } from "./MessagesProvider";
import { useAgent } from "./AgentProvider";

interface RuntimeProviderProps {
  children: React.ReactNode;
  initialMessages?: ThreadMessageLike[];
  memory?: string;
  threadId?: string;
  refreshThreadList?: () => void;
}

export function RuntimeProvider({
  children,
  refreshThreadList,
}: RuntimeProviderProps) {
  const [isRunning, setIsRunning] = useState(false);
  const { getMessages, getMessagesAsInput, appendMessage, updateMessage, setMessages } = useMessages();
  const { selectedAgent } = useAgent();

  const onNew = async (message: AppendMessage) => {
    if (message.content[0]?.type !== 'text') throw new Error('Only text messages are supported');

    const newMessage: ThreadMessageLike = {
      role: 'user',
      content: message.content[0].text
    };
    const newMessageRecord: ThreadMessageRecord = {
      id: new Date().getTime().toString(),
      ...newMessage
    };
    appendMessage(newMessageRecord);
    setIsRunning(true);

    const inputMessages = [...getMessagesAsInput(), newMessage];
    const response = await fetch(`${config.apiUrl}/agents/run`, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
      },
      body: JSON.stringify({
        agent_id: selectedAgent?.slug,
        messages: inputMessages
      }),
    });

    if (!response.ok || !response.body) {
      throw new Error("Failed to get response from API");
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let currentResponse = '';
    let currentMessage: ThreadMessageRecord = {
      id: new Date().getTime().toString(),
      role: 'assistant',
      content: ''
    };
    let toolCalls: Record<string, any> = {};
    let currentToolCallId: string = '';
    let currentToolCallName: string = '';
    let currentToolCallArguments: string = '';

    if (!reader) {
      throw new Error('No reader found');
    }

    try {
      // First add the empty assistant message
      appendMessage(currentMessage);

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        // Parse SSE format (data: {...}\n\n)
        const events = chunk.split('\n\n').filter(Boolean);

        for (const event of events) {
          let data;
          try {
            const jsonStr = event.split('data: ')[1];
            if (!jsonStr) {
              console.warn('Received empty or invalid SSE data');
              continue;
            }
            data = JSON.parse(jsonStr);
          } catch (error) {
            console.warn('Failed to parse SSE data:', error);
            console.warn('Problematic event:', event);
            continue;
          }

          // Skip if we couldn't parse the data
          if (!data) continue;

          const toolEventTypes = [
            'response.output_item.added.function_call',
            'response.function_call_arguments.delta',
            'response.function_call_arguments.done',
            // 'response.output_item.added.web_search_call',
            // 'response.output_item.done.web_search_call',
            'tool_output'
          ];
          const isToolEvent: boolean = toolEventTypes.includes(data?.type);
          // For debugging
          // console.log(`[Chat service] Streaming data: `, data);

          if (data?.type === 'agent_updated_stream_event') {
            // Agent name that started
          }
          if (data?.type === 'response.output_item.added.web_search_call') {
            currentResponse += '\n\n🌐 Searching the web...\n\n';
          }
          if (data?.type === 'response.output_item.done.web_search_call') {
            currentResponse = currentResponse.replace('\n\n🌐 Searching the web...\n\n', '\n\n🌐 Searched the web\n\n');
          }
          if (data?.type === 'response.output_text.delta') {
            currentResponse += data?.data?.delta || '';
          }
          // Handle tool call events
          if (data?.type === 'response.output_item.added.function_call') {
            currentToolCallId = data?.data?.call_id || '';
            currentToolCallName = data?.data?.name || '';
          }
          if (data?.type === 'response.function_call_arguments.delta') {
            currentToolCallArguments += data?.data?.delta || '';
          }
          if (data?.type === 'response.function_call_arguments.done') {
            // If not streaming: currentToolCallArguments += data?.data?.arguments || '';
          }
          if (isToolEvent) {
            let toolCallArgs: any | undefined = undefined;
            try {
              toolCallArgs = JSON.parse(currentToolCallArguments);
            } catch (e: any) {
              // noop
            }
            toolCalls[currentToolCallId] = {
              ...(toolCalls?.[currentToolCallId] || {}),
              type: 'tool-call',
              toolCallId: currentToolCallId,
              toolName: currentToolCallName,
              args: toolCallArgs || currentToolCallArguments,
              argsText: currentToolCallArguments,
              result: data?.data?.output || undefined,
            };
          }
          // Update message with tool calls and response
          currentMessage = {
            ...currentMessage,
            content: [
              ...(Object.values(toolCalls)?.length ? Object.values(toolCalls) : []),
              {
                type: 'text',
                text: currentResponse,
              },
            ],
          };
          // Reset tool call variables
          if (data?.type === 'tool_output') {
            currentToolCallId = '';
            currentToolCallName = '';
            currentToolCallArguments = '';
          }
          // Write message to store
          updateMessage(currentMessage.id, currentMessage);
        }
      }
    } catch (error) {
      console.error("Error reading stream:", error);
    } finally {
      reader.releaseLock();
      setIsRunning(false);
      refreshThreadList?.();
    }
  };

  const runtime = useExternalStoreRuntime<ThreadMessageLike>({
    isRunning,
    messages: getMessages(),
    setMessages: setMessages,
    onNew,
    adapters: {
      attachments: new CompositeAttachmentAdapter([
        new SimpleImageAttachmentAdapter(),
        new SimpleTextAttachmentAdapter(),
      ]),
    },
    convertMessage: (message) => message,
  });

  return (
    <AssistantRuntimeProvider runtime={runtime}>
      {children}
    </AssistantRuntimeProvider>
  );
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question about using the SDK
Projects
None yet
Development

No branches or pull requests

6 participants