Skip to content

refactor(filter): optimize for filter #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 92 additions & 224 deletions resources/functions/openwebui_monitor.py
Original file line number Diff line number Diff line change
@@ -1,249 +1,117 @@
from typing import Optional, Callable, Any, Awaitable
from pydantic import Field, BaseModel
import requests
import time


TRANSLATIONS = {
"en": {
"network_request_failed": "Network request failed: {error}",
"request_failed": "Request failed: [{error_type}] {error_msg}",
"insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`",
"unknown_error": "Unknown error",
"api_key_invalid": "API key validation failed",
"cost": "Cost: ${cost:.4f}",
"balance": "Balance: ${balance:.4f}",
"tokens": "Tokens: {input}+{output}",
"time_spent": "Time: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s"
},
"zh": {
"network_request_failed": "网络请求失败: {error}",
"request_failed": "请求失败: [{error_type}] {error_msg}",
"insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`",
"unknown_error": "未知错误",
"api_key_invalid": "API密钥验证失败",
"cost": "费用: ¥{cost:.4f}",
"balance": "余额: ¥{balance:.4f}",
"tokens": "Token: {input}+{output}",
"time_spent": "耗时: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s"
}
}
"""
title: Usage Monitor
author: VariantConst & OVINC CN
git_url: https://github.com/VariantConst/OpenWebUI-Monitor.git
version: 0.3.5
requirements: httpx
license: MIT
"""

import logging
from typing import Dict, Optional

from httpx import AsyncClient
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class CustomException(Exception):
pass


class Filter:
class Valves(BaseModel):
API_ENDPOINT: str = Field(
default="", description="The base URL for the API endpoint."
)
API_KEY: str = Field(default="", description="API key for authentication.")
priority: int = Field(
default=5, description="Priority level for the filter operations."
)
show_cost: bool = Field(default=True, description="Display cost information")
show_balance: bool = Field(
default=True, description="Display balance information"
)
show_spend_time: bool = Field(default=True, description="Display spend time")
show_tokens: bool = Field(default=True, description="Display token usage")
show_tokens_per_sec: bool = Field(
default=True, description="Display tokens per second"
)
language: str = Field(
default="en",
description="Language for messages (en/zh)"
)
api_endpoint: str = Field(default="", description="openwebui-monitor's base url")
api_key: str = Field(default="", description="openwebui-monitor's api key")
priority: int = Field(default=5, description="filter priority")

def __init__(self):
self.type = "filter"
self.name = "OpenWebUI Monitor"
self.valves = self.Valves()
self.outage = False
self.start_time = None
self.translations = TRANSLATIONS
self.inlet_temp = None

def get_text(self, key: str, **kwargs) -> str:
"""获取指定语言的文本"""
lang = self.valves.language
if lang not in self.translations:
lang = "en"
text = self.translations[lang].get(key, self.translations["en"][key])
return text.format(**kwargs) if kwargs else text

def _prepare_user_dict(self, __user__: dict) -> dict:
"""将 __user__ 对象转换为可序列化的字典"""
user_dict = dict(__user__)
if "valves" in user_dict and hasattr(user_dict["valves"], "model_dump"):
user_dict["valves"] = user_dict["valves"].model_dump()

return user_dict
def _prepare_body_dict(self, body: dict) -> dict:
"""将 body 对象转换为可序列化的字典"""
body_dict = dict(body)
if "model" in body_dict["metadata"] and hasattr(
body_dict["metadata"]["model"], "model_dump"
):
body_dict["metadata"]["model"] = body_dict["metadata"]["model"].model_dump()

return body_dict
def _modify_outlet_body(self, body: dict) -> dict:
body_modify = dict(body)
last_message = body_modify["messages"][-1]

if "info" not in last_message and self.inlet_temp is not None:
body_modify["messages"][:-1] = self.inlet_temp["messages"]
return body_modify

def inlet(
self, body: dict, user: Optional[dict] = None, __user__: dict = {}
) -> dict:
self.start_time = time.time()
self.outage_map: Dict[str, bool] = {}

try:
post_url = f"{self.valves.API_ENDPOINT}/api/v1/inlet"
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"}

user_dict = self._prepare_user_dict(__user__)
body_dict = self._prepare_body_dict(body)
self.inlet_temp = body_dict
response = requests.post(
post_url, headers=headers, json={"user": user_dict, "body": body_dict}
)
async def request(self, client: AsyncClient, url: str, headers: dict, json: dict):
response = await client.post(url=url, headers=headers, json=json)
response.raise_for_status()
response_data = response.json()
if not response_data.get("success"):
logger.error("[usage_monitor] req monitor failed: %s", response_data)
raise CustomException("calculate usage failed, please contact administrator")
return response_data

if response.status_code == 401:
return body
async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__["id"]

response.raise_for_status()
response_data = response.json()
client = AsyncClient()

if not response_data.get("success"):
error_msg = response_data.get("error", self.get_text("unknown_error"))
error_type = response_data.get("error_type", "UNKNOWN_ERROR")
raise Exception(self.get_text("request_failed", error_type=error_type, error_msg=error_msg))
try:
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/inlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json={"user": __user__, "body": body},
)
self.outage_map[user_id] = response_data.get("balance", 0) <= 0
if self.outage_map[user_id]:
logger.info("[usage_monitor] no balance: %s", user_id)
raise CustomException("no balance, please contact administrator")

self.outage = response_data.get("balance", 0) <= 0
if self.outage:
raise Exception(self.get_text("insufficient_balance", balance=response_data['balance']))
return body

except requests.exceptions.RequestException as e:
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 401
):
return body
raise Exception(self.get_text("network_request_failed", error=str(e)))
except Exception as e:
raise Exception(f"处理请求时发生错误: {str(e)}")
except Exception as err:
logger.exception("[usage_monitor] error calculating usage: %s", err)
if isinstance(err, CustomException):
raise err
raise Exception(f"error calculating usage, {err}") from err

finally:
await client.aclose()

async def outlet(
self,
body: dict,
user: Optional[dict] = None,
__user__: dict = {},
__event_emitter__: Callable[[Any], Awaitable[None]] = None,
__metadata__: Optional[dict] = None,
__user__: Optional[dict] = None,
__event_emitter__: callable = None,
) -> dict:
if self.outage:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__["id"]

if self.outage_map[user_id]:
return body

client = AsyncClient()

try:
post_url = f"{self.valves.API_ENDPOINT}/api/v1/outlet"
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"}

user_dict = self._prepare_user_dict(__user__)
body_modify = self._modify_outlet_body(body)

request_data = {
"user": user_dict,
"body": body_modify,
}

response = requests.post(post_url, headers=headers, json=request_data)

if response.status_code == 401:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "API密钥验证失败",
"done": True,
},
}
)
return body

response.raise_for_status()
result = response.json()

if not result.get("success"):
error_msg = result.get("error", "未知错误")
error_type = result.get("error_type", "UNKNOWN_ERROR")
raise Exception(f"请求失败: [{error_type}] {error_msg}")

input_tokens = result["inputTokens"]
output_tokens = result["outputTokens"]
total_cost = result["totalCost"]
new_balance = result["newBalance"]

stats_array = []

if self.valves.show_cost:
stats_array.append(self.get_text("cost", cost=total_cost))
if self.valves.show_balance:
stats_array.append(self.get_text("balance", balance=new_balance))
if self.valves.show_tokens:
stats_array.append(self.get_text("tokens", input=input_tokens, output=output_tokens))

if self.start_time and self.valves.show_spend_time:
elapsed_time = time.time() - self.start_time
stats_array.append(self.get_text("time_spent", time=elapsed_time))

if self.valves.show_tokens_per_sec:
stats_array.append(self.get_text("tokens_per_sec", tokens_per_sec=output_tokens/elapsed_time))

stats = " | ".join(stat for stat in stats_array)

if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": stats,
"done": True,
},
}
)
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/outlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json={"user": __user__, "body": body},
)

# pylint: disable=C0209
stats = " | ".join(
[
f"Tokens: {response_data['inputTokens']} + {response_data['outputTokens']}",
"Cost: %.4f" % response_data["totalCost"],
"Balance: %.4f" % response_data["newBalance"],
]
)

await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}})

logger.info("usage_monitor: %s %s", user_id, stats)
return body

except requests.exceptions.RequestException as e:
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 401
):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "API密钥验证失败",
"done": True,
},
}
)
return body
raise Exception(f"网络请求失败: {str(e)}")
except Exception as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"错误: {str(e)}",
"done": True,
},
}
)
raise Exception(f"处理请求时发生错误: {str(e)}")
except Exception as err:
logger.exception("[usage_monitor] error calculating usage: %s", err)
raise Exception(f"error calculating usage, {err}") from err

finally:
await client.aclose()