Skip to content

Aim Security post-call guardrails support #8356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/my-website/docs/proxy/guardrails/aim_security.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ guardrails:
- guardrail_name: aim-protected-app
litellm_params:
guardrail: aim
mode: pre_call # 'during_call' is also available
mode: [pre_call, post_call] # "During_call" is also available
api_key: os.environ/AIM_API_KEY
api_base: os.environ/AIM_API_BASE # Optional, use only when using a self-hosted Aim Outpost
```
Expand Down
9 changes: 9 additions & 0 deletions litellm/integrations/custom_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ async def async_post_call_streaming_hook(
) -> Any:
pass

async def async_post_call_streaming_iterator_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
response: Any,
request_data: dict,
) -> Any:
async for item in response:
yield item

#### SINGLE-USE #### - https://docs.litellm.ai/docs/observability/custom_callback#using-your-custom-callback-function

def log_input_event(self, model, messages, kwargs, print_verbose, callback_func):
Expand Down
106 changes: 93 additions & 13 deletions litellm/proxy/guardrails/guardrail_hooks/aim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
# https://www.aim.security/
#
# +-------------------------------------------------------------+

import asyncio
import json
import os
from typing import Literal, Optional, Union
from typing import Any, Literal, Optional, Union

from fastapi import HTTPException
from pydantic import BaseModel
from websockets.asyncio.client import ClientConnection, connect

from litellm import DualCache
from litellm._logging import verbose_proxy_logger
Expand All @@ -18,29 +21,32 @@
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.proxy_server import StreamingCallbackError
from litellm.types.utils import (
Choices,
EmbeddingResponse,
ImageResponse,
ModelResponse,
ModelResponseStream,
)


class AimGuardrailMissingSecrets(Exception):
pass


class AimGuardrail(CustomGuardrail):
def __init__(
self, api_key: Optional[str] = None, api_base: Optional[str] = None, **kwargs
):
self.async_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.GuardrailCallback
)
def __init__(self, api_key: Optional[str] = None, api_base: Optional[str] = None, **kwargs):
self.async_handler = get_async_httpx_client(llm_provider=httpxSpecialProvider.GuardrailCallback)
self.api_key = api_key or os.environ.get("AIM_API_KEY")
if not self.api_key:
msg = (
"Couldn't get Aim api key, either set the `AIM_API_KEY` in the environment or "
"pass it as a parameter to the guardrail in the config file"
)
raise AimGuardrailMissingSecrets(msg)
self.api_base = (
api_base or os.environ.get("AIM_API_BASE") or "https://api.aim.security"
)
self.api_base = api_base or os.environ.get("AIM_API_BASE") or "https://api.aim.security"
self.ws_api_base = self.api_base.replace("http://", "ws://").replace("https://", "wss://")
super().__init__(**kwargs)

async def async_pre_call_hook(
Expand Down Expand Up @@ -98,8 +104,82 @@ async def call_aim_guardrail(self, data: dict, hook: str) -> None:
detected = res["detected"]
verbose_proxy_logger.info(
"Aim: detected: {detected}, enabled policies: {policies}".format(
detected=detected, policies=list(res["details"].keys())
)
detected=detected,
policies=list(res["details"].keys()),
),
)
if detected:
raise HTTPException(status_code=400, detail=res["detection_message"])

async def call_aim_guardrail_on_output(self, request_data: dict, output: str, hook: str) -> None:
user_email = request_data.get("metadata", {}).get("headers", {}).get("x-aim-user-email")
headers = {"Authorization": f"Bearer {self.api_key}", "x-aim-litellm-hook": hook} | (
{"x-aim-user-email": user_email} if user_email else {}
)
response = await self.async_handler.post(
f"{self.api_base}/detect/output",
headers=headers,
json={"output": output, "messages": request_data.get("messages", [])},
)
response.raise_for_status()
res = response.json()
detected = res["detected"]
verbose_proxy_logger.info(
"Aim: detected: {detected}, enabled policies: {policies}".format(
detected=detected,
policies=list(res["details"].keys()),
),
)
if detected:
return res["detection_message"]
return None

async def async_post_call_success_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
response: Union[Any, ModelResponse, EmbeddingResponse, ImageResponse],
) -> Any:
if isinstance(response, ModelResponse) and response.choices and isinstance(response.choices[0], Choices):
content = response.choices[0].message.content or ""
detection = await self.call_aim_guardrail_on_output(data, content, hook="output")
if detection:
raise HTTPException(status_code=400, detail=detection)

async def async_post_call_streaming_iterator_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
response,
request_data: dict,
) -> Any:
user_email = request_data.get("metadata", {}).get("headers", {}).get("x-aim-user-email")
headers = {
"Authorization": f"Bearer {self.api_key}",
} | ({"x-aim-user-email": user_email} if user_email else {})
async with connect(f"{self.ws_api_base}/detect/output/ws", additional_headers=headers) as websocket:
sender = asyncio.create_task(self.forward_the_stream_to_aim(websocket, response))
while True:
result = json.loads(await websocket.recv())
if verified_chunk := result.get("verified_chunk"):
yield ModelResponseStream.model_validate(verified_chunk)
else:
sender.cancel()
if result.get("done"):
return
if blocking_message := result.get("blocking_message"):
raise StreamingCallbackError(blocking_message)
verbose_proxy_logger.error(f"Unknown message received from AIM: {result}")
return

async def forward_the_stream_to_aim(
self,
websocket: ClientConnection,
response_iter,
) -> None:
async for chunk in response_iter:
if isinstance(chunk, BaseModel):
chunk = chunk.model_dump_json()
if isinstance(chunk, dict):
chunk = json.dumps(chunk)
await websocket.send(chunk)
await websocket.send(json.dumps({"done": True}))
30 changes: 20 additions & 10 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
get_origin,
get_type_hints,
)
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
TextCompletionResponse,
)

if TYPE_CHECKING:
from opentelemetry.trace import Span as _Span
Expand Down Expand Up @@ -1374,6 +1379,10 @@ async def _run_background_health_check():
await asyncio.sleep(health_check_interval)


class StreamingCallbackError(Exception):
pass


class ProxyConfig:
"""
Abstraction class on top of config loading/updating logic. Gives us one place to control all config updating logic.
Expand Down Expand Up @@ -3035,8 +3044,7 @@ async def async_data_generator(
):
verbose_proxy_logger.debug("inside generator")
try:
time.time()
async for chunk in response:
async for chunk in proxy_logging_obj.async_post_call_streaming_iterator_hook(user_api_key_dict=user_api_key_dict, response=response, request_data=request_data):
verbose_proxy_logger.debug(
"async_data_generator: received streaming chunk - {}".format(chunk)
)
Expand Down Expand Up @@ -3073,6 +3081,8 @@ async def async_data_generator(

if isinstance(e, HTTPException):
raise e
elif isinstance(e, StreamingCallbackError):
error_msg = str(e)
else:
error_traceback = traceback.format_exc()
error_msg = f"{str(e)}\n\n{error_traceback}"
Expand Down Expand Up @@ -5403,11 +5413,11 @@ async def token_counter(request: TokenCountRequest):
)
async def supported_openai_params(model: str):
"""
Returns supported openai params for a given litellm model name
Returns supported openai params for a given litellm model name

e.g. `gpt-4` vs `gpt-3.5-turbo`
e.g. `gpt-4` vs `gpt-3.5-turbo`

Example curl:
Example curl:
```
curl -X GET --location 'http://localhost:4000/utils/supported_openai_params?model=gpt-3.5-turbo-16k' \
--header 'Authorization: Bearer sk-1234'
Expand Down Expand Up @@ -6405,7 +6415,7 @@ async def model_group_info(
- /model_group/info returns all model groups. End users of proxy should use /model_group/info since those models will be used for /chat/completions, /embeddings, etc.
- /model_group/info?model_group=rerank-english-v3.0 returns all model groups for a specific model group (`model_name` in config.yaml)



Example Request (All Models):
```shell
Expand All @@ -6423,10 +6433,10 @@ async def model_group_info(
-H 'Authorization: Bearer sk-1234'
```

Example Request (Specific Wildcard Model Group): (e.g. `model_name: openai/*` on config.yaml)
Example Request (Specific Wildcard Model Group): (e.g. `model_name: openai/*` on config.yaml)
```shell
curl -X 'GET' \
'http://localhost:4000/model_group/info?model_group=openai/tts-1'
'http://localhost:4000/model_group/info?model_group=openai/tts-1'
-H 'accept: application/json' \
-H 'Authorization: Bearersk-1234'
```
Expand Down Expand Up @@ -7531,7 +7541,7 @@ async def invitation_update(
):
"""
Update when invitation is accepted

```
curl -X POST 'http://localhost:4000/invitation/update' \
-H 'Content-Type: application/json' \
Expand Down Expand Up @@ -7592,7 +7602,7 @@ async def invitation_delete(
):
"""
Delete invitation link

```
curl -X POST 'http://localhost:4000/invitation/delete' \
-H 'Content-Type: application/json' \
Expand Down
34 changes: 32 additions & 2 deletions litellm/proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ProxyErrorTypes,
ProxyException,
)
from litellm.types.guardrails import GuardrailEventHooks

try:
import backoff
Expand All @@ -31,7 +32,7 @@
import litellm
import litellm.litellm_core_utils
import litellm.litellm_core_utils.litellm_logging
from litellm import EmbeddingResponse, ImageResponse, ModelResponse, Router
from litellm import EmbeddingResponse, ImageResponse, ModelResponse, Router, ModelResponseStream
from litellm._logging import verbose_proxy_logger
from litellm._service_logger import ServiceLogging, ServiceTypes
from litellm.caching.caching import DualCache, RedisCache
Expand Down Expand Up @@ -972,7 +973,7 @@ async def async_post_call_streaming_hook(
1. /chat/completions
"""
response_str: Optional[str] = None
if isinstance(response, ModelResponse):
if isinstance(response, (ModelResponse, ModelResponseStream)):
response_str = litellm.get_response_string(response_obj=response)
if response_str is not None:
for callback in litellm.callbacks:
Expand All @@ -992,6 +993,35 @@ async def async_post_call_streaming_hook(
raise e
return response

def async_post_call_streaming_iterator_hook(
self,
response,
user_api_key_dict: UserAPIKeyAuth,
request_data: dict,
):
"""
Allow user to modify outgoing streaming data -> Given a whole response iterator.
This hook is best used when you need to modify multiple chunks of the response at once.

Covers:
1. /chat/completions
"""
for callback in litellm.callbacks:
_callback: Optional[CustomLogger] = None
if isinstance(callback, str):
_callback = litellm.litellm_core_utils.litellm_logging.get_custom_logger_compatible_class(callback)
else:
_callback = callback # type: ignore
if _callback is not None and isinstance(_callback, CustomLogger):
if not isinstance(_callback, CustomGuardrail) or _callback.should_run_guardrail(
data=request_data, event_type=GuardrailEventHooks.post_call
):
response = _callback.async_post_call_streaming_iterator_hook(
user_api_key_dict=user_api_key_dict, response=response, request_data=request_data
)
return response


async def post_call_streaming_hook(
self,
response: str,
Expand Down
2 changes: 1 addition & 1 deletion litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3947,7 +3947,7 @@ def _count_characters(text: str) -> int:
return len(filtered_text)


def get_response_string(response_obj: ModelResponse) -> str:
def get_response_string(response_obj: Union[ModelResponse, ModelResponseStream]) -> str:
_choices: List[Union[Choices, StreamingChoices]] = response_obj.choices

response_str = ""
Expand Down
Loading
Loading