1
1
import copy
2
2
import json
3
3
import uuid
4
- from typing import Union
4
+ from abc import ABC , abstractmethod
5
+ from typing import Callable , Dict , Union
6
+ from urllib .parse import urljoin
5
7
6
8
import structlog
7
9
from fastapi .responses import JSONResponse , StreamingResponse
8
10
from litellm import ModelResponse
9
11
from litellm .types .utils import Delta , StreamingChoices
10
- from ollama import ChatResponse
12
+ from ollama import ChatResponse , GenerateResponse
11
13
12
14
from codegate .db import models as db_models
13
15
from codegate .muxing import rulematcher
@@ -34,7 +36,7 @@ def _get_provider_formatted_url(self, model_route: rulematcher.ModelRoute) -> st
34
36
db_models .ProviderType .openai ,
35
37
db_models .ProviderType .openrouter ,
36
38
]:
37
- return f" { model_route .endpoint .endpoint } /v1"
39
+ return urljoin ( model_route .endpoint .endpoint , " /v1")
38
40
return model_route .endpoint .endpoint
39
41
40
42
def set_destination_info (self , model_route : rulematcher .ModelRoute , data : dict ) -> dict :
@@ -45,15 +47,101 @@ def set_destination_info(self, model_route: rulematcher.ModelRoute, data: dict)
45
47
return new_data
46
48
47
49
48
- class StreamChunkFormatter :
50
+ class OutputFormatter (ABC ):
51
+
52
+ @property
53
+ @abstractmethod
54
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
55
+ """
56
+ Return the provider specific format functions. All providers format functions should
57
+ return the chunk in OpenAI format.
58
+ """
59
+ pass
60
+
61
+ @abstractmethod
62
+ def format (
63
+ self , response : Union [StreamingResponse , JSONResponse ], dest_prov : db_models .ProviderType
64
+ ) -> Union [StreamingResponse , JSONResponse ]:
65
+ """Format the response to the client."""
66
+ pass
67
+
68
+
69
+ class StreamChunkFormatter (OutputFormatter ):
49
70
"""
50
71
Format a single chunk from a stream to OpenAI format.
51
72
We need to configure the client to expect the OpenAI format.
52
73
In Continue this means setting "provider": "openai" in the config json file.
53
74
"""
54
75
55
- def __init__ (self ):
56
- self .provider_to_func = {
76
+ @property
77
+ @abstractmethod
78
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
79
+ """
80
+ Return the provider specific format functions. All providers format functions should
81
+ return the chunk in OpenAI format.
82
+ """
83
+ pass
84
+
85
+ def _format_openai (self , chunk : str ) -> str :
86
+ """
87
+ The chunk is already in OpenAI format. To standarize remove the "data:" prefix.
88
+
89
+ This function is used by both chat and FIM formatters
90
+ """
91
+ cleaned_chunk = chunk .split ("data:" )[1 ].strip ()
92
+ return cleaned_chunk
93
+
94
+ def _format_as_openai_chunk (self , formatted_chunk : str ) -> str :
95
+ """Format the chunk as OpenAI chunk. This is the format how the clients expect the data."""
96
+ chunk_to_send = f"data:{ formatted_chunk } \n \n "
97
+ return chunk_to_send
98
+
99
+ async def _format_streaming_response (
100
+ self , response : StreamingResponse , dest_prov : db_models .ProviderType
101
+ ):
102
+ """Format the streaming response to OpenAI format."""
103
+ format_func = self .provider_format_funcs .get (dest_prov )
104
+ openai_chunk = None
105
+ try :
106
+ async for chunk in response .body_iterator :
107
+ openai_chunk = format_func (chunk )
108
+ # Sometimes for Anthropic we couldn't get content from the chunk. Skip it.
109
+ if not openai_chunk :
110
+ continue
111
+ yield self ._format_as_openai_chunk (openai_chunk )
112
+ except Exception as e :
113
+ logger .error (f"Error sending chunk in muxing: { e } " )
114
+ yield self ._format_as_openai_chunk (str (e ))
115
+ finally :
116
+ # Make sure the last chunk is always [DONE]
117
+ if openai_chunk and "[DONE]" not in openai_chunk :
118
+ yield self ._format_as_openai_chunk ("[DONE]" )
119
+
120
+ def format (
121
+ self , response : StreamingResponse , dest_prov : db_models .ProviderType
122
+ ) -> StreamingResponse :
123
+ """Format the response to the client."""
124
+ return StreamingResponse (
125
+ self ._format_streaming_response (response , dest_prov ),
126
+ status_code = response .status_code ,
127
+ headers = response .headers ,
128
+ background = response .background ,
129
+ media_type = response .media_type ,
130
+ )
131
+
132
+
133
+ class ChatStreamChunkFormatter (StreamChunkFormatter ):
134
+ """
135
+ Format a single chunk from a stream to OpenAI format given that the request was a chat.
136
+ """
137
+
138
+ @property
139
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
140
+ """
141
+ Return the provider specific format functions. All providers format functions should
142
+ return the chunk in OpenAI format.
143
+ """
144
+ return {
57
145
db_models .ProviderType .ollama : self ._format_ollama ,
58
146
db_models .ProviderType .openai : self ._format_openai ,
59
147
db_models .ProviderType .anthropic : self ._format_antropic ,
@@ -68,21 +156,11 @@ def _format_ollama(self, chunk: str) -> str:
68
156
try :
69
157
chunk_dict = json .loads (chunk )
70
158
ollama_chunk = ChatResponse (** chunk_dict )
71
- open_ai_chunk = OLlamaToModel .normalize_chunk (ollama_chunk )
159
+ open_ai_chunk = OLlamaToModel .normalize_chat_chunk (ollama_chunk )
72
160
return open_ai_chunk .model_dump_json (exclude_none = True , exclude_unset = True )
73
161
except Exception :
74
162
return chunk
75
163
76
- def _format_openai (self , chunk : str ) -> str :
77
- """The chunk is already in OpenAI format. To standarize remove the "data:" prefix."""
78
- cleaned_chunk = chunk .split ("data:" )[1 ].strip ()
79
- try :
80
- chunk_dict = json .loads (cleaned_chunk )
81
- open_ai_chunk = ModelResponse (** chunk_dict )
82
- return open_ai_chunk .model_dump_json (exclude_none = True , exclude_unset = True )
83
- except Exception :
84
- return cleaned_chunk
85
-
86
164
def _format_antropic (self , chunk : str ) -> str :
87
165
"""Format the Anthropic chunk to OpenAI format."""
88
166
cleaned_chunk = chunk .split ("data:" )[1 ].strip ()
@@ -119,46 +197,53 @@ def _format_antropic(self, chunk: str) -> str:
119
197
except Exception :
120
198
return cleaned_chunk .strip ()
121
199
122
- def format (self , chunk : str , dest_prov : db_models .ProviderType ) -> ModelResponse :
123
- """Format the chunk to OpenAI format."""
124
- # Get the format function
125
- format_func = self .provider_to_func .get (dest_prov )
126
- if format_func is None :
127
- raise MuxingAdapterError (f"Provider { dest_prov } not supported." )
128
- return format_func (chunk )
129
200
201
+ class FimStreamChunkFormatter (StreamChunkFormatter ):
130
202
131
- class ResponseAdapter :
203
+ @property
204
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
205
+ """
206
+ Return the provider specific format functions. All providers format functions should
207
+ return the chunk in OpenAI format.
208
+ """
209
+ return {
210
+ db_models .ProviderType .ollama : self ._format_ollama ,
211
+ db_models .ProviderType .openai : self ._format_openai ,
212
+ # Our Lllamacpp provider emits OpenAI chunks
213
+ db_models .ProviderType .llamacpp : self ._format_openai ,
214
+ # OpenRouter is a dialect of OpenAI
215
+ db_models .ProviderType .openrouter : self ._format_openai ,
216
+ }
217
+
218
+ def _format_ollama (self , chunk : str ) -> str :
219
+ """Format the Ollama chunk to OpenAI format."""
220
+ try :
221
+ chunk_dict = json .loads (chunk )
222
+ ollama_chunk = GenerateResponse (** chunk_dict )
223
+ open_ai_chunk = OLlamaToModel .normalize_fim_chunk (ollama_chunk )
224
+ return json .dumps (open_ai_chunk , separators = ("," , ":" ), indent = None )
225
+ except Exception :
226
+ return chunk
132
227
133
- def __init__ (self ):
134
- self .stream_formatter = StreamChunkFormatter ()
135
228
136
- def _format_as_openai_chunk (self , formatted_chunk : str ) -> str :
137
- """Format the chunk as OpenAI chunk. This is the format how the clients expect the data."""
138
- return f"data:{ formatted_chunk } \n \n "
229
+ class ResponseAdapter :
139
230
140
- async def _format_streaming_response (
141
- self , response : StreamingResponse , dest_prov : db_models .ProviderType
142
- ):
143
- """Format the streaming response to OpenAI format."""
144
- async for chunk in response .body_iterator :
145
- openai_chunk = self .stream_formatter .format (chunk , dest_prov )
146
- # Sometimes for Anthropic we couldn't get content from the chunk. Skip it.
147
- if not openai_chunk :
148
- continue
149
- yield self ._format_as_openai_chunk (openai_chunk )
231
+ def _get_formatter (
232
+ self , response : Union [StreamingResponse , JSONResponse ], is_fim_request : bool
233
+ ) -> OutputFormatter :
234
+ """Get the formatter based on the request type."""
235
+ if isinstance (response , StreamingResponse ):
236
+ if is_fim_request :
237
+ return FimStreamChunkFormatter ()
238
+ return ChatStreamChunkFormatter ()
239
+ raise MuxingAdapterError ("Only streaming responses are supported." )
150
240
151
241
def format_response_to_client (
152
- self , response : Union [StreamingResponse , JSONResponse ], dest_prov : db_models .ProviderType
242
+ self ,
243
+ response : Union [StreamingResponse , JSONResponse ],
244
+ dest_prov : db_models .ProviderType ,
245
+ is_fim_request : bool ,
153
246
) -> Union [StreamingResponse , JSONResponse ]:
154
247
"""Format the response to the client."""
155
- if isinstance (response , StreamingResponse ):
156
- return StreamingResponse (
157
- self ._format_streaming_response (response , dest_prov ),
158
- status_code = response .status_code ,
159
- headers = response .headers ,
160
- background = response .background ,
161
- media_type = response .media_type ,
162
- )
163
- else :
164
- raise MuxingAdapterError ("Only streaming responses are supported." )
248
+ stream_formatter = self ._get_formatter (response , is_fim_request )
249
+ return stream_formatter .format (response , dest_prov )
0 commit comments