-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Labels
Description
Parent Issue
Part of #500 — depends on #501, used by #502
Context
The existing chat.py endpoint calls agent.chat(user_message) synchronously and returns a complete string response. For live interactive sessions, the frontend needs token-by-token streaming so the UI updates as the model generates. This issue builds a streaming adapter that wraps the Anthropic SDK streaming API and emits typed events for the WebSocket relay.
Existing Code to Build On
codeframe/core/adapters/claude_code.py— existing Claude Code adapter (subprocess-based)codeframe/adapters/llm/— existing LLM adapter layercodeframe/core/streaming.py—EventPublisherasync event distribution pattern
What to Build
New file: codeframe/core/adapters/streaming_chat.py
from dataclasses import dataclass
from typing import AsyncIterator
from enum import Enum
class ChatEventType(str, Enum):
TEXT_DELTA = "text_delta"
TOOL_USE_START = "tool_use_start"
TOOL_RESULT = "tool_result"
THINKING = "thinking"
COST_UPDATE = "cost_update"
DONE = "done"
ERROR = "error"
@dataclass
class ChatEvent:
type: ChatEventType
content: str | None = None
tool_name: str | None = None
tool_input: dict | None = None
cost_usd: float | None = None
input_tokens: int | None = None
output_tokens: int | None = None
class StreamingChatAdapter:
"""
Wraps the Anthropic SDK streaming API.
Yields ChatEvent objects as the model generates.
Supports interrupt via asyncio.Event.
Persists messages to session_messages table after each turn.
"""
def __init__(self, session_id: str, model: str, db_repo):
...
async def send_message(
self,
content: str,
history: list[dict],
interrupt_event: asyncio.Event | None = None,
) -> AsyncIterator[ChatEvent]:
"""
Stream a single turn.
Uses anthropic.AsyncAnthropic().messages.stream().
Yields ChatEvent per SDK event type:
- RawContentBlockDeltaEvent → TEXT_DELTA or THINKING
- RawContentBlockStartEvent (tool_use) → TOOL_USE_START
- tool execution result → TOOL_RESULT
- MessageStopEvent → COST_UPDATE + DONE
Checks interrupt_event between chunks; if set, stops and closes stream.
Persists user message and complete assistant turn to session_messages.
"""
...Tool execution
For interactive sessions, run a limited safe tool set (read files, search, list directory). Do not execute shell commands or write files without explicit user confirmation — keep the scope conservative for v1.
Tools to support initially:
Read(read a file)Glob(find files by pattern)Grep(search file contents)
Each tool call yields TOOL_USE_START → executes → yields TOOL_RESULT.
Message history management
- Load
session_messagesfrom DB at session start to reconstruct context - Append new user message before the API call
- Append complete assistant response after the stream ends
- Truncate history if context window approaches limit (use
tiktokenalready in deps)
Acceptance Criteria
-
StreamingChatAdapter.send_message()yieldsTEXT_DELTAevents as tokens arrive (not buffered) - Tool calls yield
TOOL_USE_STARTwith tool name + input, thenTOOL_RESULTwith output -
THINKINGevents emitted when extended thinking is enabled -
COST_UPDATEevent emitted at turn end with correct token counts -
interrupt_event.set()causes the stream to stop within 1 turn - Messages persisted to
session_messagesafter each complete turn - History reconstructed correctly from DB on adapter init
- Unit tests in
tests/core/test_streaming_chat.pyusing mocked Anthropic client
Notes
- Use
anthropic.AsyncAnthropic()— do not use the synchronous client - Model defaults to
claude-sonnet-4-6if not specified in session - Check
ANTHROPIC_API_KEYenv var; raise clear error if missing
Reactions are currently unavailable