Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(functions): restore language translation, and token/s display #67

Merged
merged 1 commit into from
Mar 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 65 additions & 25 deletions resources/functions/openwebui_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"""

import logging
import time
from typing import Dict, Optional

from httpx import AsyncClient
from pydantic import BaseModel, Field
import json
Expand All @@ -18,6 +18,27 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

TRANSLATIONS = {
"en": {
"request_failed": "Request failed: {error_msg}",
"insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`",
"cost": "Cost: ${cost:.4f}",
"balance": "Balance: ${balance:.4f}",
"tokens": "Tokens: {input}+{output}",
"time_spent": "Time: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
"zh": {
"request_failed": "请求失败: {error_msg}",
"insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`",
"cost": "费用: ¥{cost:.4f}",
"balance": "余额: ¥{balance:.4f}",
"tokens": "Token: {input}+{output}",
"time_spent": "耗时: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
}


class CustomException(Exception):
pass
Expand All @@ -28,27 +49,41 @@ class Valves(BaseModel):
api_endpoint: str = Field(default="", description="openwebui-monitor's base url")
api_key: str = Field(default="", description="openwebui-monitor's api key")
priority: int = Field(default=5, description="filter priority")
language: str = Field(default="zh", description="language (en/zh)")
show_time_spent: bool = Field(default=True, description="show time spent")
show_tokens_per_sec: bool = Field(default=True, description="show tokens per second")
show_cost: bool = Field(default=True, description="show cost")
show_balance: bool = Field(default=True, description="show balance")
show_tokens: bool = Field(default=True, description="show tokens")

def __init__(self):
self.type = "filter"
self.name = "OpenWebUI Monitor"
self.valves = self.Valves()
self.outage_map: Dict[str, bool] = {}

self.start_time: Optional[float] = None

def get_text(self, key: str, **kwargs) -> str:
lang = self.valves.language if self.valves.language in TRANSLATIONS else "en"
text = TRANSLATIONS[lang].get(key, TRANSLATIONS["en"][key])
return text.format(**kwargs) if kwargs else text

async def request(self, client: AsyncClient, url: str, headers: dict, json_data: dict):
json_data = json.loads(json.dumps(json_data, default=lambda o: o.dict() if hasattr(o, "dict") else str(o)))

response = await client.post(url=url, headers=headers, json=json_data)
response.raise_for_status()
response_data = response.json()
if not response_data.get("success"):
logger.error("[usage_monitor] req monitor failed: %s", response_data)
raise CustomException("calculate usage failed, please contact administrator")
logger.error(self.get_text("request_failed", error_msg=response_data))
raise CustomException(self.get_text("request_failed", error_msg=response_data))
return response_data

async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__["id"]
self.start_time = time.time()
user_id = __user__.get("id", "default")

client = AsyncClient()

Expand All @@ -61,13 +96,12 @@ async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__:
)
self.outage_map[user_id] = response_data.get("balance", 0) <= 0
if self.outage_map[user_id]:
logger.info("[usage_monitor] no balance: %s", user_id)
raise CustomException("no balance, please contact administrator")

logger.info(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
raise CustomException(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
return body

except Exception as err:
logger.exception("[usage_monitor] error calculating usage: %s", err)
logger.exception(self.get_text("request_failed", error_msg=err))
if isinstance(err, CustomException):
raise err
raise Exception(f"error calculating usage, {err}") from err
Expand All @@ -80,13 +114,13 @@ async def outlet(
body: dict,
__metadata__: Optional[dict] = None,
__user__: Optional[dict] = None,
__event_emitter__: callable = None,
__event_emitter__: Optional[callable] = None,
) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__["id"]
user_id = __user__.get("id", "default")

if self.outage_map[user_id]:
if self.outage_map.get(user_id, False):
return body

client = AsyncClient()
Expand All @@ -99,23 +133,29 @@ async def outlet(
json_data={"user": __user__, "body": body},
)

# pylint: disable=C0209
stats = " | ".join(
[
f"Tokens: {response_data['inputTokens']} + {response_data['outputTokens']}",
"Cost: %.4f" % response_data["totalCost"],
"Balance: %.4f" % response_data["newBalance"],
]
)

await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}})
stats_list = []
if self.valves.show_tokens:
stats_list.append(self.get_text("tokens", input=response_data["inputTokens"], output=response_data["outputTokens"]))
if self.valves.show_cost:
stats_list.append(self.get_text("cost", cost=response_data["totalCost"]))
if self.valves.show_balance:
stats_list.append(self.get_text("balance", balance=response_data["newBalance"]))
if self.start_time and self.valves.show_time_spent:
elapsed = time.time() - self.start_time
stats_list.append(self.get_text("time_spent", time=elapsed))
if self.valves.show_tokens_per_sec:
tokens_per_sec = (response_data["outputTokens"] / elapsed if elapsed > 0 else 0)
stats_list.append(self.get_text("tokens_per_sec", tokens_per_sec=tokens_per_sec))

stats = " | ".join(stats_list)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}})

logger.info("usage_monitor: %s %s", user_id, stats)
return body

except Exception as err:
logger.exception("[usage_monitor] error calculating usage: %s", err)
raise Exception(f"error calculating usage, {err}") from err

logger.exception(self.get_text("request_failed", error_msg=err))
raise Exception(self.get_text("request_failed", error_msg=err))
finally:
await client.aclose()
Loading